Whatever you do the lion share of time is going to be taken by insert into Hive table.
Ok check this. It is CSV files inserted into Hive ORC table. This version uses Hive on Spark engine and it is written in Hive executed via beeline --1 Move .CSV data into HDFS: --2 Create an external table. --3 Create the ORC table. --4 Insert the data from the external table to the Hive ORC table select from_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') AS StartTime; set hive.exec.reducers.max=256; use accounts; --set hive.execution.engine=mr; --2) DROP TABLE IF EXISTS stg_t2; CREATE EXTERNAL TABLE stg_t2 ( INVOICENUMBER string ,PAYMENTDATE string ,NET string ,VAT string ,TOTAL string ) COMMENT 'from csv file from excel sheet nw_10124772' ROW FORMAT serde 'org.apache.hadoop.hive.serde2.OpenCSVSerde' STORED AS TEXTFILE LOCATION '/data/stg/accounts/nw/10124772' TBLPROPERTIES ("skip.header.line.count"="1") ; --3) DROP TABLE IF EXISTS t2; CREATE TABLE t2 ( INVOICENUMBER INT ,PAYMENTDATE date ,NET DECIMAL(20,2) ,VAT DECIMAL(20,2) ,TOTAL DECIMAL(20,2) ) COMMENT 'from csv file from excel sheet nw_10124772' CLUSTERED BY (INVOICENUMBER) INTO 256 BUCKETS STORED AS ORC TBLPROPERTIES ( "orc.compress"="ZLIB" ) ; --4) Put data in target table. do the conversion and ignore empty rows INSERT INTO TABLE t2 SELECT INVOICENUMBER , TO_DATE(FROM_UNIXTIME(UNIX_TIMESTAMP(paymentdate,'dd/MM/yyyy'),'yyyy-MM-dd')) AS paymentdate --, CAST(REGEXP_REPLACE(SUBSTR(net,2,20),",","") AS DECIMAL(20,2)) , CAST(REGEXP_REPLACE(net,'[^\\d\\.]','') AS DECIMAL(20,2)) , CAST(REGEXP_REPLACE(vat,'[^\\d\\.]','') AS DECIMAL(20,2)) , CAST(REGEXP_REPLACE(total,'[^\\d\\.]','') AS DECIMAL(20,2)) FROM stg_t2 WHERE -- INVOICENUMBER > 0 AND CAST(REGEXP_REPLACE(total,'[^\\d\\.]','') AS DECIMAL(20,2)) > 0.0 -- Exclude empty rows ; select from_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') AS EndTime; !exit And similar using Spark shell and temp table import org.apache.spark.sql.functions._ import java.sql.{Date, Timestamp} val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) println ("\nStarted at"); sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println) // // Get a DF first based on Databricks CSV libraries ignore column heading because of column called "Type" // val df = sqlContext.read.format("com.databricks.spark.csv").option("inferSchema", "true").option("header", "true").load("hdfs://rhes564:9000/data/stg/accounts/nw/10124772") // // [Date: string, Type: string, Description: string, Value: double, Balance: double, Account Name: string, Account Number: string] // case class Accounts( TransactionDate: String, TransactionType: String, Description: String, Value: Double, Balance: Double, AccountName: String, AccountNumber : String) // Map the columns to names // val a = df.filter(col("Date") > "").map(p => Accounts(p(0).toString,p(1).toString,p(2).toString,p(3).toString.toDouble,p(4).toString.toDouble,p(5).toString,p(6).toString)) // // Create a Spark temporary table // a.toDF.registerTempTable("tmp") // // Test it here // //sql("select TransactionDate, TransactionType, Description, Value, Balance, AccountName, AccountNumber from tmp").take(2) // // Need to create and populate target ORC table nw_10124772 in database accounts.in Hive // sql("use accounts") // // Drop and create table nw_10124772 // sql("DROP TABLE IF EXISTS accounts.nw_10124772") var sqltext : String = "" sqltext = """ CREATE TABLE accounts.nw_10124772 ( TransactionDate DATE ,TransactionType String ,Description String ,Value Double ,Balance Double ,AccountName String ,AccountNumber Int ) COMMENT 'from csv file from excel sheet' STORED AS ORC TBLPROPERTIES ( "orc.compress"="ZLIB" ) """ sql(sqltext) // // Put data in Hive table. Clean up is already done // sqltext = """ INSERT INTO TABLE accounts.nw_10124772 SELECT TO_DATE(FROM_UNIXTIME(UNIX_TIMESTAMP(TransactionDate,'dd/MM/yyyy'),'yyyy-MM-dd')) AS TransactionDate , TransactionType , Description , Value , Balance , AccountName , AccountNumber FROM tmp """ sql(sqltext) // // Test all went OK by looking at some old transactions // sql("Select TransactionDate, Value, Balance from nw_10124772 where TransactionDate < '2011-05-30'").collect.foreach(println) // println ("\nFinished at"); sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println) sys.exit() Anyway worth trying HTH Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com On 22 May 2016 at 20:49, swetha kasireddy <swethakasire...@gmail.com> wrote: > I am doing the 1. currently using the following and it takes a lot of > time. Whats the advantage of doing 2 and how to do it? > > sqlContext.sql(" CREATE EXTERNAL TABLE IF NOT EXISTS records (id STRING, > record STRING) PARTITIONED BY (datePartition STRING, idPartition STRING) > stored as ORC LOCATION '/user/users' ") > sqlContext.sql(" orc.compress= SNAPPY") > sqlContext.sql( > """ from recordsTemp ps insert overwrite table users > partition(datePartition , idPartition ) select ps.id, ps.record , > ps.datePartition, ps.idPartition """.stripMargin) > > On Sun, May 22, 2016 at 12:47 PM, Mich Talebzadeh < > mich.talebza...@gmail.com> wrote: > >> two alternatives for this ETL or ELT >> >> >> 1. There is only one external ORC table and you do insert overwrite >> into that external table through Spark sql >> 2. or >> 3. 14k files loaded into staging area/read directory and then insert >> overwrite into an ORC table and th >> >> >> >> Dr Mich Talebzadeh >> >> >> >> LinkedIn * >> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >> >> >> >> http://talebzadehmich.wordpress.com >> >> >> >> On 22 May 2016 at 20:38, swetha kasireddy <swethakasire...@gmail.com> >> wrote: >> >>> Around 14000 partitions need to be loaded every hour. Yes, I tested this >>> and its taking a lot of time to load. A partition would look something like >>> the following which is further partitioned by userId with all the >>> userRecords for that date inside it. >>> >>> 5 2016-05-20 16:03 /user/user/userRecords/dtPartitioner=2012-09-12 >>> >>> On Sun, May 22, 2016 at 12:30 PM, Mich Talebzadeh < >>> mich.talebza...@gmail.com> wrote: >>> >>>> by partition do you mean 14000 files loaded in each batch session (say >>>> daily)?. >>>> >>>> Have you actually tested this? >>>> >>>> Dr Mich Talebzadeh >>>> >>>> >>>> >>>> LinkedIn * >>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>> >>>> >>>> >>>> http://talebzadehmich.wordpress.com >>>> >>>> >>>> >>>> On 22 May 2016 at 20:24, swetha kasireddy <swethakasire...@gmail.com> >>>> wrote: >>>> >>>>> The data is not very big. Say 1MB-10 MB at the max per partition. What >>>>> is the best way to insert this 14k partitions with decent performance? >>>>> >>>>> On Sun, May 22, 2016 at 12:18 PM, Mich Talebzadeh < >>>>> mich.talebza...@gmail.com> wrote: >>>>> >>>>>> the acid question is how many rows are you going to insert in a batch >>>>>> session? btw if this is purely an sql operation then you can do all that >>>>>> in >>>>>> hive running on spark engine. It will be very fast as well. >>>>>> >>>>>> >>>>>> >>>>>> Dr Mich Talebzadeh >>>>>> >>>>>> >>>>>> >>>>>> LinkedIn * >>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>> >>>>>> >>>>>> >>>>>> http://talebzadehmich.wordpress.com >>>>>> >>>>>> >>>>>> >>>>>> On 22 May 2016 at 20:14, Jörn Franke <jornfra...@gmail.com> wrote: >>>>>> >>>>>>> 14000 partitions seem to be way too many to be performant (except >>>>>>> for large data sets). How much data does one partition contain? >>>>>>> >>>>>>> > On 22 May 2016, at 09:34, SRK <swethakasire...@gmail.com> wrote: >>>>>>> > >>>>>>> > Hi, >>>>>>> > >>>>>>> > In my Spark SQL query to insert data, I have around 14,000 >>>>>>> partitions of >>>>>>> > data which seems to be causing memory issues. How can I insert the >>>>>>> data for >>>>>>> > 100 partitions at a time to avoid any memory issues? >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > -- >>>>>>> > View this message in context: >>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-insert-data-for-100-partitions-at-a-time-using-Spark-SQL-tp26997.html >>>>>>> > Sent from the Apache Spark User List mailing list archive at >>>>>>> Nabble.com. >>>>>>> > >>>>>>> > >>>>>>> --------------------------------------------------------------------- >>>>>>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>>>> > For additional commands, e-mail: user-h...@spark.apache.org >>>>>>> > >>>>>>> >>>>>>> --------------------------------------------------------------------- >>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >