Hi Ben, Well I have done it for standard csv files downloaded from spreadsheets to staging directory on hdfs and loaded from there.
First you may not need to unzip them. dartabricks can read them (in my case) and zipped files. Check this. Mine is slightly different from what you have, First I zip my csv files with bzip2 and load them into hdfs #!/bin/ksh DIR="/data/stg/accounts/nw/10124772" # ## Compress the files # echo `date` " ""======= Started compressing all csv FILEs" for FILE in `ls *.csv` do /usr/bin/bzip2 ${FILE} done # ## Clear out hdfs staging directory # echo `date` " ""======= Started deleting old files from hdfs staging directory ${DIR}" hdfs dfs -rm -r ${DIR}/*.bz2 echo `date` " ""======= Started Putting bz2 fileS to hdfs staging directory ${DIR}" for FILE in `ls *.bz2` do hdfs dfs -copyFromLocal ${FILE} ${DIR} done echo `date` " ""======= Checking that all files are moved to hdfs staging directory" hdfs dfs -ls ${DIR} exit 0 Now you have all your csv files in the staging directory import org.apache.spark.sql.functions._ import java.sql.{Date, Timestamp} val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) println ("\nStarted at"); sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println) val df = sqlContext.read.format("com.databricks.spark.csv").option("inferSchema", "true").option("header", "true").load("hdfs://rhes564:9000/data/stg/accounts/nw/10124772") case class Accounts( TransactionDate: String, TransactionType: String, Description: String, Value: Double, Balance: Double, AccountName: String, AccountNumber : String) // Map the columns to names // val a = df.filter(col("Date") > "").map(p => Accounts(p(0).toString,p(1).toString,p(2).toString,p(3).toString.toDouble,p(4).toString.toDouble,p(5).toString,p(6).toString)) // // Create a Spark temporary table // a.toDF.registerTempTable("tmp") // Need to create and populate target ORC table nw_10124772 in database accounts.in Hive // sql("use accounts") // // Drop and create table nw_10124772 // sql("DROP TABLE IF EXISTS accounts.nw_10124772") var sqltext : String = "" sqltext = """ CREATE TABLE accounts.nw_10124772 ( TransactionDate DATE ,TransactionType String ,Description String ,Value Double ,Balance Double ,AccountName String ,AccountNumber Int ) COMMENT 'from csv file from excel sheet' STORED AS ORC TBLPROPERTIES ( "orc.compress"="ZLIB" ) """ sql(sqltext) // // Put data in Hive table. Clean up is already done // sqltext = """ INSERT INTO TABLE accounts.nw_10124772 SELECT TO_DATE(FROM_UNIXTIME(UNIX_TIMESTAMP(TransactionDate,'dd/MM/yyyy'),'yyyy-MM-dd')) AS TransactionDate , TransactionType , Description , Value , Balance , AccountName , AccountNumber FROM tmp """ sql(sqltext) println ("\nFinished at"); sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.fore Once you store into a some form of table (Parquet, ORC) etc you can do whatever you like with it. HTH Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com On 30 March 2016 at 22:13, Benjamin Kim <bbuil...@gmail.com> wrote: > Hi Mich, > > You are correct. I am talking about the Databricks package spark-csv you > have below. > > The files are stored in s3 and I download, unzip, and store each one of > them in a variable as a string using the AWS SDK (aws-java-sdk-1.10.60.jar). > > Here is some of the code. > > val filesRdd = sc.parallelize(lFiles, 250) > filesRdd.foreachPartition(files => { > val s3Client = new AmazonS3Client(new > EnvironmentVariableCredentialsProvider()) > files.foreach(file => { > val s3Object = s3Client.getObject(new GetObjectRequest(s3Bucket, file)) > val zipFile = new ZipInputStream(s3Object.getObjectContent()) > val csvFile = readZipStream(zipFile) > }) > }) > > This function does the unzipping and converts to string. > > def readZipStream(stream: ZipInputStream): String = { > stream.getNextEntry > var stuff = new ListBuffer[String]() > val scanner = new Scanner(stream) > while(scanner.hasNextLine){ > stuff += scanner.nextLine > } > stuff.toList.mkString("\n") > } > > The next step is to parse the CSV string and convert to a dataframe, which > will populate a Hive/HBase table. > > If you can help, I would be truly grateful. > > Thanks, > Ben > > > On Mar 30, 2016, at 2:06 PM, Mich Talebzadeh <mich.talebza...@gmail.com> > wrote: > > just to clarify are you talking about databricks csv package. > > $SPARK_HOME/bin/spark-shell --packages com.databricks:spark-csv_2.11:1.3.0 > > Where are these zipped files? Are they copied to a staging directory in > hdfs? > > HTH > > Dr Mich Talebzadeh > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > http://talebzadehmich.wordpress.com > > > > On 30 March 2016 at 15:17, Benjamin Kim <bbuil...@gmail.com> wrote: > >> I have a quick question. I have downloaded multiple zipped files from S3 >> and unzipped each one of them into strings. The next step is to parse using >> a CSV parser. I want to know if there is a way to easily use the spark csv >> package for this? >> >> Thanks, >> Ben >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> > >