Hi Vince,

We had a similar case a while back. I tried two solutions in both Spark on
Hive metastore and Hive on Spark engine.

Hive version 2
Spark as Hive engine 1.3.1

Basically

--1 Move .CSV data into HDFS:
--2 Create an external table (all columns as string)
--3 Create the ORC table (majority Int)
--4 Insert the data from the external table to the Hive ORC table
compressed as zlib

ORC seems to be in this case a good candidate as a simple insert/select
from external table to ORC takes no time. I bucketed ORC table and marked
it as transactional in case one needs to make a correction to it (not
really needed).

The whole process was time stamped and it took 5 minutes to complete and
there were 7,009,728 rows in total.


+-------------------------+--+
|        starttime        |
+-------------------------+--+
| 19/03/2016 22:21:19.19  |
+-------------------------+--+

+-------------------------+--+
|         endtime         |
+-------------------------+--+
| 19/03/2016 22:26:12.12  |
+-------------------------+--+



This is the code. I will try spark code later with parquet

select from_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') AS
StartTime;
set hive.exec.reducers.max=256;
use test;
--set hive.execution.engine=mr;
--2)
DROP TABLE IF EXISTS stg_t2;
CREATE EXTERNAL TABLE stg_t2 (
   Year         string
,  Month        string
,  DayofMonth   string
,  DayOfWeek    string
,  DepTime      string
,  CRSDepTime   string
,  ArrTime      string
,  CRSArrTime   string
,  UniqueCarrier        string
,  FlightNum    string
,  TailNum      string
,  ActualElapsedTime    string
,  CRSElapsedTime       string
,  AirTime      string
,  ArrDelay     string
,  DepDelay     string
,  Origin       string
,  Dest         string
,  Distance     string
,  TaxiIn       string
,  TaxiOut      string
,  Cancelled    string
,  CancellationCode     string
,  Diverted     string
,  CarrierDelay         string
,  WeatherDelay         string
,  NASDelay     string
,  SecurityDelay        string
,  LateAircraftDelay    string
)
COMMENT 'from csv file from
http://stat-computing.org/dataexpo/2009/the-data.html, tear 2008'
ROW FORMAT serde 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
STORED AS TEXTFILE
LOCATION 'hdfs://rhes564:9000/data/stg2'
TBLPROPERTIES ("skip.header.line.count"="1")
;
--3)
DROP TABLE IF EXISTS t2008;
CREATE TABLE t2008 (
   Year         int
,  Month        int
,  DayofMonth   int
,  DayOfWeek    int
,  DepTime      string
,  CRSDepTime   string
,  ArrTime      string
,  CRSArrTime   string
,  UniqueCarrier        string
,  FlightNum    int
,  TailNum      int
,  ActualElapsedTime    int
,  CRSElapsedTime       int
,  AirTime      int
,  ArrDelay     int
,  DepDelay     int
,  Origin       string
,  Dest         string
,  Distance     int
,  TaxiIn       int
,  TaxiOut      int
,  Cancelled    string
,  CancellationCode     string
,  Diverted     string
,  CarrierDelay         int
,  WeatherDelay         int
,  NASDelay     int
,  SecurityDelay        int
,  LateAircraftDelay    int
)
COMMENT 'from csv file from
http://stat-computing.org/dataexpo/2009/the-data.html, tear 2008'
CLUSTERED BY (Year, Month, DayofMonth, DayOfWeek, DepTime) INTO 256 BUCKETS
STORED AS ORC
TBLPROPERTIES ( "orc.compress"="ZLIB",
"transactional"="true")
;
--4) Put data in target table. do the conversion and ignore empty rows
INSERT INTO TABLE t2008
SELECT
          *
FROM
stg_t2
;
--select count(1) from t2008
;
select from_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') AS EndTime;
!exit

HTH


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 19 March 2016 at 19:18, Vincent Ohprecio <ohpre...@gmail.com> wrote:

>
> For some reason writing data from Spark shell to csv using the `csv
> package` takes almost an hour to dump to disk. Am I going crazy or did I do
> this wrong? I tried writing to parquet first and its fast as normal.
>
> On my Macbook Pro 16g - 2.2 GHz Intel Core i7 -1TB the machine CPU's goes
> crazy and it sounds like its taking off like a plane ... lol
>
> Here is the code if anyone wants to experiment:
>
> // ./bin/spark-shell --packages com.databricks:spark-csv_2.11:1.4.0
>
> //
>
> // version 2.0.0-SNAPSHOT
>
> // Using Scala version 2.11.7 (Java HotSpot(TM) 64-Bit Server VM, Java
> 1.7.0_80)
>
> // http://stat-computing.org/dataexpo/2009/the-data.html
>
>
> def time[R](block: => R): R = {
>
>     val t0 = System.nanoTime()
>
>     val result = block    // call-by-name
>
>     val t1 = System.nanoTime()
>
>     println("Elapsed time: " + (t1 - t0) + "ns")
>
>     result
>
> }
>
>
> val df =
> sqlContext.read.format("com.databricks.spark.csv").option("header",
> "true").load("/Users/employee/Downloads/2008.csv")
>
> val df_1 = df.withColumnRenamed("Year","oldYear")
>
> val df_2 =
> df_1.withColumn("Year",df_1.col("oldYear").cast("int")).drop("oldYear")
>
> def convertColumn(df: org.apache.spark.sql.DataFrame, name:String,
> newType:String) = {
>
>   val df_1 = df.withColumnRenamed(name, "swap")
>
>   df_1.withColumn(name, df_1.col("swap").cast(newType)).drop("swap")
>
> }
>
> val df_3 = convertColumn(df_2, "ArrDelay", "int")
>
> val df_4 = convertColumn(df_2, "DepDelay", "int")
>
>
> // test write to parquet is fast
>
> df_4.select("Year",
> "Cancelled").write.format("parquet").save("yearAndCancelled.parquet")
>
>
> val selectedData = df_4.select("Year", "Cancelled")
>
>
>
> val howLong =
> Time(selectedData.write.format("com.databricks.spark.csv").option("header",
> "true").save("output.csv"))
>
>
> //scala> val howLong =
> time(selectedData.write.format("com.databricks.spark.csv").option("header",
> "true").save("output.csv"))
>
> //Elapsed time: 3488272270000ns
>
> //howLong: Unit = ()
>
> https://gist.github.com/bigsnarfdude/581b780ce85d7aaecbcb
>
>
>
>
>

Reply via email to