Hi Mich,

I forgot to mention that - this is the ugly part - the source data provider 
gives us (Windows) pkzip compressed files. Will spark uncompress these 
automatically? I haven’t been able to make it work.

Thanks,
Ben

> On Mar 30, 2016, at 2:27 PM, Mich Talebzadeh <mich.talebza...@gmail.com> 
> wrote:
> 
> Hi Ben,
> 
> Well I have done it for standard csv files downloaded from spreadsheets to 
> staging directory on hdfs and loaded from there.
> 
> First you may not need to unzip them. dartabricks can read them (in my case) 
> and zipped files.
> 
> Check this. Mine is slightly different from what you have, First I zip my csv 
> files with bzip2 and load them into hdfs
> 
> #!/bin/ksh
> DIR="/data/stg/accounts/nw/10124772"
> #
> ## Compress the files
> #
> echo `date` " ""=======  Started compressing all csv FILEs"
> for FILE in `ls *.csv`
> do
>   /usr/bin/bzip2 ${FILE}
> done
> #
> ## Clear out hdfs staging directory
> #
> echo `date` " ""=======  Started deleting old files from hdfs staging 
> directory ${DIR}"
> hdfs dfs -rm -r ${DIR}/*.bz2
> echo `date` " ""=======  Started Putting bz2 fileS to hdfs staging directory 
> ${DIR}"
> for FILE in `ls *.bz2`
> do
>   hdfs dfs -copyFromLocal ${FILE} ${DIR}
> done
> echo `date` " ""=======  Checking that all files are moved to hdfs staging 
> directory"
> hdfs dfs -ls ${DIR}
> exit 0
> 
> Now you have all your csv files in the staging directory
> 
> import org.apache.spark.sql.functions._
> import java.sql.{Date, Timestamp}
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("\nStarted at"); sqlContext.sql("SELECT 
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') 
> ").collect.foreach(println)
> 
> val df = 
> sqlContext.read.format("com.databricks.spark.csv").option("inferSchema", 
> "true").option("header", 
> "true").load("hdfs://rhes564:9000/data/stg/accounts/nw/10124772")
> case class Accounts( TransactionDate: String, TransactionType: String, 
> Description: String, Value: Double, Balance: Double, AccountName: String, 
> AccountNumber : String)
> // Map the columns to names
> //
> val a = df.filter(col("Date") > "").map(p => 
> Accounts(p(0).toString,p(1).toString,p(2).toString,p(3).toString.toDouble,p(4).toString.toDouble,p(5).toString,p(6).toString))
> //
> // Create a Spark temporary table
> //
> a.toDF.registerTempTable("tmp")
> 
> // Need to create and populate target ORC table nw_10124772 in database 
> accounts.in <http://accounts.in/> Hive
> //
> sql("use accounts")
> //
> // Drop and create table nw_10124772
> //
> sql("DROP TABLE IF EXISTS accounts.nw_10124772")
> var sqltext : String = ""
> sqltext = """
> CREATE TABLE accounts.nw_10124772 (
> TransactionDate            DATE
> ,TransactionType           String
> ,Description               String
> ,Value                     Double
> ,Balance                   Double
> ,AccountName               String
> ,AccountNumber             Int
> )
> COMMENT 'from csv file from excel sheet'
> STORED AS ORC
> TBLPROPERTIES ( "orc.compress"="ZLIB" )
> """
> sql(sqltext)
> //
> // Put data in Hive table. Clean up is already done
> //
> sqltext = """
> INSERT INTO TABLE accounts.nw_10124772
> SELECT
>           
> TO_DATE(FROM_UNIXTIME(UNIX_TIMESTAMP(TransactionDate,'dd/MM/yyyy'),'yyyy-MM-dd'))
>  AS TransactionDate
>         , TransactionType
>         , Description
>         , Value
>         , Balance
>         , AccountName
>         , AccountNumber
> FROM tmp
> """
> sql(sqltext)
> 
> println ("\nFinished at"); sqlContext.sql("SELECT 
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.fore
> 
> Once you store into a some form of table (Parquet, ORC) etc you can do 
> whatever you like with it.
> 
> HTH
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>
>  
> http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/>
>  
> 
> On 30 March 2016 at 22:13, Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> Hi Mich,
> 
> You are correct. I am talking about the Databricks package spark-csv you have 
> below.
> 
> The files are stored in s3 and I download, unzip, and store each one of them 
> in a variable as a string using the AWS SDK (aws-java-sdk-1.10.60.jar).
> 
> Here is some of the code.
> 
> val filesRdd = sc.parallelize(lFiles, 250)
> filesRdd.foreachPartition(files => {
>   val s3Client = new AmazonS3Client(new 
> EnvironmentVariableCredentialsProvider())
>   files.foreach(file => {
>     val s3Object = s3Client.getObject(new GetObjectRequest(s3Bucket, file))
>     val zipFile = new ZipInputStream(s3Object.getObjectContent())
>     val csvFile = readZipStream(zipFile)
>   })
> })
> 
> This function does the unzipping and converts to string.
> 
> def readZipStream(stream: ZipInputStream): String = {
>   stream.getNextEntry
>   var stuff = new ListBuffer[String]()
>   val scanner = new Scanner(stream)
>   while(scanner.hasNextLine){
>     stuff += scanner.nextLine
>   }
>   stuff.toList.mkString("\n")
> }
> 
> The next step is to parse the CSV string and convert to a dataframe, which 
> will populate a Hive/HBase table.
> 
> If you can help, I would be truly grateful.
> 
> Thanks,
> Ben
> 
> 
>> On Mar 30, 2016, at 2:06 PM, Mich Talebzadeh <mich.talebza...@gmail.com 
>> <mailto:mich.talebza...@gmail.com>> wrote:
>> 
>> just to clarify are you talking about databricks csv package.
>> 
>> $SPARK_HOME/bin/spark-shell --packages com.databricks:spark-csv_2.11:1.3.0
>> 
>> Where are these zipped files? Are they copied to a staging directory in hdfs?
>> 
>> HTH
>> 
>> Dr Mich Talebzadeh
>>  
>> LinkedIn  
>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>  
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>
>>  
>> http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/>
>>  
>> 
>> On 30 March 2016 at 15:17, Benjamin Kim <bbuil...@gmail.com 
>> <mailto:bbuil...@gmail.com>> wrote:
>> I have a quick question. I have downloaded multiple zipped files from S3 and 
>> unzipped each one of them into strings. The next step is to parse using a 
>> CSV parser. I want to know if there is a way to easily use the spark csv 
>> package for this?
>> 
>> Thanks,
>> Ben
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
>> <mailto:user-unsubscr...@spark.apache.org>
>> For additional commands, e-mail: user-h...@spark.apache.org 
>> <mailto:user-h...@spark.apache.org>
>> 
>> 
> 
> 

Reply via email to