Hi Ben,

Well I have done it for standard csv files downloaded from spreadsheets to
staging directory on hdfs and loaded from there.

First you may not need to unzip them. dartabricks can read them (in my
case) and zipped files.

Check this. Mine is slightly different from what you have, First I zip my
csv files with bzip2 and load them into hdfs

#!/bin/ksh
DIR="/data/stg/accounts/nw/10124772"
#
## Compress the files
#
echo `date` " ""=======  Started compressing all csv FILEs"
for FILE in `ls *.csv`
do
  /usr/bin/bzip2 ${FILE}
done
#
## Clear out hdfs staging directory
#
echo `date` " ""=======  Started deleting old files from hdfs staging
directory ${DIR}"
hdfs dfs -rm -r ${DIR}/*.bz2
echo `date` " ""=======  Started Putting bz2 fileS to hdfs staging
directory ${DIR}"
for FILE in `ls *.bz2`
do
  hdfs dfs -copyFromLocal ${FILE} ${DIR}
done
echo `date` " ""=======  Checking that all files are moved to hdfs staging
directory"
hdfs dfs -ls ${DIR}
exit 0

Now you have all your csv files in the staging directory

import org.apache.spark.sql.functions._
import java.sql.{Date, Timestamp}
val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
println ("\nStarted at"); sqlContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)

val df =
sqlContext.read.format("com.databricks.spark.csv").option("inferSchema",
"true").option("header",
"true").load("hdfs://rhes564:9000/data/stg/accounts/nw/10124772")
case class Accounts( TransactionDate: String, TransactionType: String,
Description: String, Value: Double, Balance: Double, AccountName: String,
AccountNumber : String)
// Map the columns to names
//
val a = df.filter(col("Date") > "").map(p =>
Accounts(p(0).toString,p(1).toString,p(2).toString,p(3).toString.toDouble,p(4).toString.toDouble,p(5).toString,p(6).toString))
//
// Create a Spark temporary table
//
a.toDF.registerTempTable("tmp")

// Need to create and populate target ORC table nw_10124772 in database
accounts.in Hive
//
sql("use accounts")
//
// Drop and create table nw_10124772
//
sql("DROP TABLE IF EXISTS accounts.nw_10124772")
var sqltext : String = ""
sqltext = """
CREATE TABLE accounts.nw_10124772 (
TransactionDate            DATE
,TransactionType           String
,Description               String
,Value                     Double
,Balance                   Double
,AccountName               String
,AccountNumber             Int
)
COMMENT 'from csv file from excel sheet'
STORED AS ORC
TBLPROPERTIES ( "orc.compress"="ZLIB" )
"""
sql(sqltext)
//
// Put data in Hive table. Clean up is already done
//
sqltext = """
INSERT INTO TABLE accounts.nw_10124772
SELECT

TO_DATE(FROM_UNIXTIME(UNIX_TIMESTAMP(TransactionDate,'dd/MM/yyyy'),'yyyy-MM-dd'))
AS TransactionDate
        , TransactionType
        , Description
        , Value
        , Balance
        , AccountName
        , AccountNumber
FROM tmp
"""
sql(sqltext)

println ("\nFinished at"); sqlContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.fore

Once you store into a some form of table (Parquet, ORC) etc you can do
whatever you like with it.

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 30 March 2016 at 22:13, Benjamin Kim <bbuil...@gmail.com> wrote:

> Hi Mich,
>
> You are correct. I am talking about the Databricks package spark-csv you
> have below.
>
> The files are stored in s3 and I download, unzip, and store each one of
> them in a variable as a string using the AWS SDK (aws-java-sdk-1.10.60.jar).
>
> Here is some of the code.
>
> val filesRdd = sc.parallelize(lFiles, 250)
> filesRdd.foreachPartition(files => {
>   val s3Client = new AmazonS3Client(new
> EnvironmentVariableCredentialsProvider())
>   files.foreach(file => {
>     val s3Object = s3Client.getObject(new GetObjectRequest(s3Bucket, file))
>     val zipFile = new ZipInputStream(s3Object.getObjectContent())
>     val csvFile = readZipStream(zipFile)
>   })
> })
>
> This function does the unzipping and converts to string.
>
> def readZipStream(stream: ZipInputStream): String = {
>   stream.getNextEntry
>   var stuff = new ListBuffer[String]()
>   val scanner = new Scanner(stream)
>   while(scanner.hasNextLine){
>     stuff += scanner.nextLine
>   }
>   stuff.toList.mkString("\n")
> }
>
> The next step is to parse the CSV string and convert to a dataframe, which
> will populate a Hive/HBase table.
>
> If you can help, I would be truly grateful.
>
> Thanks,
> Ben
>
>
> On Mar 30, 2016, at 2:06 PM, Mich Talebzadeh <mich.talebza...@gmail.com>
> wrote:
>
> just to clarify are you talking about databricks csv package.
>
> $SPARK_HOME/bin/spark-shell --packages com.databricks:spark-csv_2.11:1.3.0
>
> Where are these zipped files? Are they copied to a staging directory in
> hdfs?
>
> HTH
>
> Dr Mich Talebzadeh
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 30 March 2016 at 15:17, Benjamin Kim <bbuil...@gmail.com> wrote:
>
>> I have a quick question. I have downloaded multiple zipped files from S3
>> and unzipped each one of them into strings. The next step is to parse using
>> a CSV parser. I want to know if there is a way to easily use the spark csv
>> package for this?
>>
>> Thanks,
>> Ben
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>

Reply via email to