well my guess is just pkunzip it and use bzip2 to zip it or leave it as it is.
Databricks handles *.bz2 type files. I know that. Anyway that is the easy part :) Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com On 31 March 2016 at 01:02, Benjamin Kim <bbuil...@gmail.com> wrote: > Hi Mich, > > I forgot to mention that - this is the ugly part - the source data > provider gives us (Windows) pkzip compressed files. Will spark uncompress > these automatically? I haven’t been able to make it work. > > Thanks, > Ben > > On Mar 30, 2016, at 2:27 PM, Mich Talebzadeh <mich.talebza...@gmail.com> > wrote: > > Hi Ben, > > Well I have done it for standard csv files downloaded from spreadsheets to > staging directory on hdfs and loaded from there. > > First you may not need to unzip them. dartabricks can read them (in my > case) and zipped files. > > Check this. Mine is slightly different from what you have, First I zip my > csv files with bzip2 and load them into hdfs > > #!/bin/ksh > DIR="/data/stg/accounts/nw/10124772" > # > ## Compress the files > # > echo `date` " ""======= Started compressing all csv FILEs" > for FILE in `ls *.csv` > do > /usr/bin/bzip2 ${FILE} > done > # > ## Clear out hdfs staging directory > # > echo `date` " ""======= Started deleting old files from hdfs staging > directory ${DIR}" > hdfs dfs -rm -r ${DIR}/*.bz2 > echo `date` " ""======= Started Putting bz2 fileS to hdfs staging > directory ${DIR}" > for FILE in `ls *.bz2` > do > hdfs dfs -copyFromLocal ${FILE} ${DIR} > done > echo `date` " ""======= Checking that all files are moved to hdfs staging > directory" > hdfs dfs -ls ${DIR} > exit 0 > > Now you have all your csv files in the staging directory > > import org.apache.spark.sql.functions._ > import java.sql.{Date, Timestamp} > val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) > println ("\nStarted at"); sqlContext.sql("SELECT > FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') > ").collect.foreach(println) > > val df = > sqlContext.read.format("com.databricks.spark.csv").option("inferSchema", > "true").option("header", "true").load(" > hdfs://rhes564:9000/data/stg/accounts/nw/10124772") > case class Accounts( TransactionDate: String, TransactionType: String, > Description: String, Value: Double, Balance: Double, AccountName: String, > AccountNumber : String) > // Map the columns to names > // > val a = df.filter(col("Date") > "").map(p => > Accounts(p(0).toString,p(1).toString,p(2).toString,p(3).toString.toDouble,p(4).toString.toDouble,p(5).toString,p(6).toString)) > // > // Create a Spark temporary table > // > a.toDF.registerTempTable("tmp") > > // Need to create and populate target ORC table nw_10124772 in database > accounts.in Hive > // > sql("use accounts") > // > // Drop and create table nw_10124772 > // > sql("DROP TABLE IF EXISTS accounts.nw_10124772") > var sqltext : String = "" > sqltext = """ > CREATE TABLE accounts.nw_10124772 ( > TransactionDate DATE > ,TransactionType String > ,Description String > ,Value Double > ,Balance Double > ,AccountName String > ,AccountNumber Int > ) > COMMENT 'from csv file from excel sheet' > STORED AS ORC > TBLPROPERTIES ( "orc.compress"="ZLIB" ) > """ > sql(sqltext) > // > // Put data in Hive table. Clean up is already done > // > sqltext = """ > INSERT INTO TABLE accounts.nw_10124772 > SELECT > > TO_DATE(FROM_UNIXTIME(UNIX_TIMESTAMP(TransactionDate,'dd/MM/yyyy'),'yyyy-MM-dd')) > AS TransactionDate > , TransactionType > , Description > , Value > , Balance > , AccountName > , AccountNumber > FROM tmp > """ > sql(sqltext) > > println ("\nFinished at"); sqlContext.sql("SELECT > FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.fore > > Once you store into a some form of table (Parquet, ORC) etc you can do > whatever you like with it. > > HTH > > Dr Mich Talebzadeh > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > http://talebzadehmich.wordpress.com > > > > On 30 March 2016 at 22:13, Benjamin Kim <bbuil...@gmail.com> wrote: > >> Hi Mich, >> >> You are correct. I am talking about the Databricks package spark-csv you >> have below. >> >> The files are stored in s3 and I download, unzip, and store each one of >> them in a variable as a string using the AWS SDK (aws-java-sdk-1.10.60.jar). >> >> Here is some of the code. >> >> val filesRdd = sc.parallelize(lFiles, 250) >> filesRdd.foreachPartition(files => { >> val s3Client = new AmazonS3Client(new >> EnvironmentVariableCredentialsProvider()) >> files.foreach(file => { >> val s3Object = s3Client.getObject(new GetObjectRequest(s3Bucket, >> file)) >> val zipFile = new ZipInputStream(s3Object.getObjectContent()) >> val csvFile = readZipStream(zipFile) >> }) >> }) >> >> This function does the unzipping and converts to string. >> >> def readZipStream(stream: ZipInputStream): String = { >> stream.getNextEntry >> var stuff = new ListBuffer[String]() >> val scanner = new Scanner(stream) >> while(scanner.hasNextLine){ >> stuff += scanner.nextLine >> } >> stuff.toList.mkString("\n") >> } >> >> The next step is to parse the CSV string and convert to a dataframe, >> which will populate a Hive/HBase table. >> >> If you can help, I would be truly grateful. >> >> Thanks, >> Ben >> >> >> On Mar 30, 2016, at 2:06 PM, Mich Talebzadeh <mich.talebza...@gmail.com> >> wrote: >> >> just to clarify are you talking about databricks csv package. >> >> $SPARK_HOME/bin/spark-shell --packages com.databricks:spark-csv_2.11:1.3.0 >> >> Where are these zipped files? Are they copied to a staging directory in >> hdfs? >> >> HTH >> >> Dr Mich Talebzadeh >> >> >> LinkedIn * >> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >> >> >> http://talebzadehmich.wordpress.com >> >> >> >> On 30 March 2016 at 15:17, Benjamin Kim <bbuil...@gmail.com> wrote: >> >>> I have a quick question. I have downloaded multiple zipped files from S3 >>> and unzipped each one of them into strings. The next step is to parse using >>> a CSV parser. I want to know if there is a way to easily use the spark csv >>> package for this? >>> >>> Thanks, >>> Ben >>> --------------------------------------------------------------------- >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >>> >> >> > >