Can someone please shed some lights on this. I wrote the below code in Scala 2.10.5, can someone please tell me if this is the right way of doing it?
import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.functions._ import org.apache.spark.sql.hive.HiveContext class Test { def main(args: Array[String]): Unit = { val conf = new SparkConf() val sc = new SparkContext(conf) val sqlContext = new HiveContext(sc) sqlContext.sql("set spark.sql.shuffle.partitions=1000"); sqlContext.sql("set hive.exec.dynamic.partition.mode=nonstrict") val dataElementsFile = "hdfs://nameservice/user/ajay/spark/flds.txt" // deDF has only 61 rows val deDF = sqlContext.read.text(dataElementsFile).toDF("DataElement").coalesce(1).distinct().cache() deDF.withColumn("ds_nm", lit("UDA")).withColumn("tabl_nm", lit("TEST_DB.TEST_TABLE")).collect().filter(filterByDataset).map(calculateMetricsAtDELevel).foreach(persistResults) // if ds_nm starts with 'RAW_' I dont want to process it def filterByDataset(de: Row): Boolean = { val datasetName = de.getAs[String]("ds_nm").trim if (datasetName.startsWith("RAW_")) { return false } else { return true } } def calculateMetricsAtDELevel(de: Row): DataFrame = { val dataElement = de.getAs[String]("DataElement").trim val datasetName = de.getAs[String]("ds_nm").trim val tableName = de.getAs[String]("tabl_nm").trim // udaDF holds 107,762,849 Rows * 412 Columns / 105 files in HDFS and 176.5 GB * 3 Replication Factor val udaDF = sqlContext.sql("SELECT '" + datasetName + "' as ds_nm, cyc_dt, supplier_proc_i, " + " '" + dataElement + "' as data_elm, " + dataElement + " as data_elm_val FROM " + tableName + "") println("udaDF num Partitions: "+udaDF.toJavaRDD.getNumPartitions) // udaDF.toJavaRDD.getNumPartitions = 1490 val calculatedMetrics = udaDF.groupBy("ds_nm", "cyc_dt", "supplier_proc_i", "data_elm", "data_elm_val").count() println("calculatedMetrics num Partitions: " +calculatedMetrics.toJavaRDD.getNumPartitions) // calculatedMetrics.toJavaRDD.getNumPartitions = 1000 since I set it to sqlContext.sql("set spark.sql.shuffle.partitions=1000"); val adjustedSchemaDF = calculatedMetrics.withColumnRenamed("count", "derx_val_cnt").withColumn("load_dt", current_timestamp()) println("adjustedSchemaDF num Partitions: " +adjustedSchemaDF.toJavaRDD.getNumPartitions) // adjustedSchemaDF.toJavaRDD.getNumPartitions = 1000 as well return adjustedSchemaDF } def persistResults(adjustedSchemaDF: DataFrame) = { // persist the resukts into Hive table backed by PARQUET adjustedSchemaDF.write.partitionBy("ds_nm", "cyc_dt").mode("Append" ).insertInto("devl_df2_spf_batch.spf_supplier_trans_metric_detl_base_1") } } } This is my cluster( Spark 1.6.0 on Yarn, Cloudera 5.7.1) configuration, Memory -> 4.10 TB VCores -> 544 I am deploying the application in yarn client mode and the cluster is set to use Dynamic Memory Allocation. Any pointers are appreciated. Thank you On Sat, Oct 8, 2016 at 1:17 PM, Ajay Chander <itsche...@gmail.com> wrote: > Hi Everyone, > > Can anyone tell me if there is anything wrong with my code flow below ? > Based on each element from the text file I would like to run a query > against Hive table and persist results in another Hive table. I want to do > this in parallel for each element in the file. I appreciate any of your > inputs on this. > > $ cat /home/ajay/flds.txt > PHARMY_NPI_ID > ALT_SUPPLIER_STORE_NBR > MAIL_SERV_NBR > > spark-shell --name hivePersistTest --master yarn --deploy-mode client > > val dataElementsFile = "/home/ajay/flds.txt" > val dataElements = Source.fromFile(dataElementsFile).getLines.toArray > > def calculateQuery (de: String) : DataFrame = { > val calculatedQuery = sqlContext.sql("select 'UDA' as ds_nm, cyc_dt, > supplier_proc_i as supplier_proc_id, '" + de + "' as data_elm, " + de + " as > data_elm_val," + > " count(1) as derx_val_cnt, current_timestamp as load_dt " + > "from SPRINT2_TEST2 " + > "group by 'UDA', cyc_dt, supplier_proc_i, '" + de + "' , " + de + " ") > > return calculatedQuery > } > > def persistResults (calculatedQuery: DataFrame) = { > calculatedQuery.write.insertInto("sprint2_stp1_test2") > } > > dataElements.map(calculateQuery).foreach(persistResults) > > > Thanks. > >