Smarter views

2017-05-25 Thread Maurin Lenglart
Hi,
I am working on some big data related technology.
And I am trying to get a sense on how hard will it be to enhance the views
in impala so whenever someone query a view, not all the columns of that
view are computed but only the necessary columns for that particular query.

A simple exemple will be:
create view test_view_a as
select coalesce(col1,col2)  as testCol1, col3 from physical_table1

Then if I do a query like this:
select col3 from test_view_a;
The coalesce for testCol1 should not occur because not being used.

thanks


Re: Spark streaming to kafka exactly once

2017-03-23 Thread Maurin Lenglart
Ok,
Thanks for your answers

On 3/22/17, 1:34 PM, "Cody Koeninger" <c...@koeninger.org> wrote:

If you're talking about reading the same message multiple times in a
failure situation, see

https://github.com/koeninger/kafka-exactly-once

If you're talking about producing the same message multiple times in a
failure situation, keep an eye on


https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging

If you're talking about producers just misbehaving and sending
different copies of what is essentially the same message from a domain
perspective, you have to dedupe that with your own logic.

On Wed, Mar 22, 2017 at 2:52 PM, Matt Deaver <mattrdea...@gmail.com> wrote:
> You have to handle de-duplication upstream or downstream. It might
> technically be possible to handle this in Spark but you'll probably have a
> better time handling duplicates in the service that reads from Kafka.
>
> On Wed, Mar 22, 2017 at 1:49 PM, Maurin Lenglart <mau...@cuberonlabs.com>
> wrote:
>>
>> Hi,
>> we are trying to build a spark streaming solution that subscribe and push
>> to kafka.
>>
>> But we are running into the problem of duplicates events.
>>
>> Right now, I am doing a “forEachRdd” and loop over the message of each
>> partition and send those message to kafka.
>>
>>
>>
>> Is there any good way of solving that issue?
>>
>>
>>
>> thanks
>
>
>
>
> --
> Regards,
>
> Matt
> Data Engineer
> https://www.linkedin.com/in/mdeaver
> http://mattdeav.pythonanywhere.com/




Spark streaming to kafka exactly once

2017-03-22 Thread Maurin Lenglart
Hi,
we are trying to build a spark streaming solution that subscribe and push to 
kafka.
But we are running into the problem of duplicates events.
Right now, I am doing a “forEachRdd” and loop over the message of each 
partition and send those message to kafka.

Is there any good way of solving that issue?

thanks


.tar.bz2 in spark

2016-12-08 Thread Maurin Lenglart
Hi,
I am trying to load a json file compress in .tar.bz2 but spark throw an error.
I am using pyspark with spark 1.6.2. (Cloudera 5.9)

What will be the best way to handle that?
I don’t want to have a non-spark job that will just uncompressed the data…

thanks


SizeEstimator for python

2016-08-15 Thread Maurin Lenglart
Hi,
Is there a way to estimate the size of a dataframe in python?
Something similar to 
https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/util/SizeEstimator.html
 ?

thanks


dynamic coalesce to pick file size

2016-07-26 Thread Maurin Lenglart
Hi,
I am doing a Sql query that return a Dataframe. Then I am writing the result of 
the query using “df.write”, but the result get written in a lot of different 
small files (~100 of 200 ko). So now I am doing a “.coalesce(2)” before the 
write.
But the number “2” that I picked is static, is there have a way of dynamically 
picking the number depending of the file size wanted? (around 256mb would be 
perfect)

I am running spark 1.6 on CDH using yarn, the files are written in parquet 
format.

Thanks



Re: Spark Website

2016-07-13 Thread Maurin Lenglart
Same here

From: Benjamin Kim 
Date: Wednesday, July 13, 2016 at 11:47 AM
To: manish ranjan 
Cc: user 
Subject: Re: Spark Website

It takes me to the directories instead of the webpage.

On Jul 13, 2016, at 11:45 AM, manish ranjan 
> wrote:

working for me. What do you mean 'as supposed to'?

~Manish


On Wed, Jul 13, 2016 at 11:45 AM, Benjamin Kim 
> wrote:
Has anyone noticed that the spark.apache.org is not 
working as supposed to?


-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org




Re: orc vs parquet aggregation, orc is really slow

2016-04-17 Thread Maurin Lenglart

Let me explain a little my architecture:
I have one cluster with hive and spark. Over there I create my databases and 
create the tables and insert data in them.
If I execute this query  :self.sqlContext.sql(“SELECT `event_date` as 
`event_date`,sum(`bookings`) as `bookings`,sum(`dealviews`) as `dealviews` FROM 
myTable WHERE  `event_date` >= '2016-01-06' AND `event_date` <= '2016-04-02' 
GROUP BY `event_date` LIMIT 2”) using ORC, it take 15 sec

But then I export the tables to an other cluster where I don’t have hive. So I 
load my tables using
sqlContext.read.format(‘orc').load(‘mytableFiles’).registerAsTable(‘myTable’)
Then this query: self.sqlContext.sql(“SELECT `event_date` as 
`event_date`,sum(`bookings`) as `bookings`,sum(`dealviews`) as `dealviews` FROM 
myTable WHERE  `event_date` >= '2016-01-06' AND `event_date` <= '2016-04-02' 
GROUP BY `event_date` LIMIT 2”) take 50 seconds.

If I do the same process using parquet tables self.sqlContext.sql(“SELECT 
`event_date` as `event_date`,sum(`bookings`) as `bookings`,sum(`dealviews`) as 
`dealviews` FROM myTable WHERE  `event_date` >= '2016-01-06' AND `event_date` 
<= '2016-04-02' GROUP BY `event_date` LIMIT 2”) take 8 seconds.


thanks

From: Mich Talebzadeh 
<mich.talebza...@gmail.com<mailto:mich.talebza...@gmail.com>>
Date: Sunday, April 17, 2016 at 2:52 PM
To: maurin lenglart <mau...@cuberonlabs.com<mailto:mau...@cuberonlabs.com>>
Cc: "user @spark" <user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: orc vs parquet aggregation, orc is really slow

hang on so it takes 15 seconds to switch the database context with 
HiveContext.sql("use myDatabase") ?


Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com<http://talebzadehmich.wordpress.com/>



On 17 April 2016 at 22:44, Maurin Lenglart 
<mau...@cuberonlabs.com<mailto:mau...@cuberonlabs.com>> wrote:
The stats are only for one file in one partition. There is 17970737 rows in 
total.
The table is not bucketed.

The problem is not inserting rows, the problem is with this SQL query:

“SELECT `event_date` as `event_date`,sum(`bookings`) as 
`bookings`,sum(`dealviews`) as `dealviews` FROM myTable WHERE  `event_date` >= 
'2016-01-06' AND `event_date` <= '2016-04-02' GROUP BY `event_date` LIMIT 2”

Benchmarks :

  *   8 seconds on parquet table loaded using  
sqlContext.read.format(‘parquet').load(‘mytableFiles’).registerAsTable(‘myTable’)
  *   50 seconds on ORC using  
sqlContext.read.format(‘orc').load(‘mytableFiles’).registerAsTable(‘myTable’)
  *   15 seconds on ORC using sqlContext(‘use myDatabase’)

The use case that I have is the second and slowest benchmark. Is there 
something I can do to speed that up?

thanks



From: Mich Talebzadeh 
<mich.talebza...@gmail.com<mailto:mich.talebza...@gmail.com>>
Date: Sunday, April 17, 2016 at 2:22 PM

To: maurin lenglart <mau...@cuberonlabs.com<mailto:mau...@cuberonlabs.com>>
Cc: "user @spark" <user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: orc vs parquet aggregation, orc is really slow

Hi Maurin,

Have you tried to create your table in Hive as parquet table? This table is 
pretty small with 100K rows.

Is Hive table bucketed at all? I gather your issue is inserting rows into Hive 
table at the moment that taking longer time (compared to Parquet)?

HTH




Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com<http://talebzadehmich.wordpress.com/>



On 17 April 2016 at 21:43, Maurin Lenglart 
<mau...@cuberonlabs.com<mailto:mau...@cuberonlabs.com>> wrote:
Hi,
I am using cloudera distribution, and when I do a" desc formatted table” I don 
t get all the table parameters.

But I did a hive orcfiledump on one random file ( I replaced some of the values 
that can be sensible) :
hive --orcfiledump 
/user/hive/warehouse/myDB.db/mytable/event_date=2016-04-01/part-1
2016-04-17 01:36:15,424 WARN  [main] mapreduce.TableMapReduceUtil: The 
hbase-prefix-tree module jar containing PrefixTreeCodec is not present.  
Continuing without it.
Structure for 
/user/hive/warehouse/myDB.db/mytable/event_date=2016-04-01/part-1
File Version: 0.12 with HIVE_8732
16/04/17 01:36:18 INFO orc.ReaderImpl: Reading ORC rows from 
/user/hive/warehouse/myDB.db/mytable/event_date=2016-04-01/part-1 with 
{include: null, offset: 0, length: 9223372036854775807}
Rows: 104260
Compression: ZLIB
Compression size: 262144
Type: struct

Stripe Statistics:
  Stripe 1:
Column 0: count: 104260 hasNull: false
Column 1: count: 104260 hasNull: false min: XXX max: XXX sum: 365456
Column 2: count: 104260 hasNull: false min: XXX max: XXX sum: 634322
Column 3: count: 104260 h

Re: orc vs parquet aggregation, orc is really slow

2016-04-17 Thread Maurin Lenglart
The stats are only for one file in one partition. There is 17970737 rows in 
total.
The table is not bucketed.

The problem is not inserting rows, the problem is with this SQL query:

“SELECT `event_date` as `event_date`,sum(`bookings`) as 
`bookings`,sum(`dealviews`) as `dealviews` FROM myTable WHERE  `event_date` >= 
'2016-01-06' AND `event_date` <= '2016-04-02' GROUP BY `event_date` LIMIT 2”

Benchmarks :

  *   8 seconds on parquet table loaded using  
sqlContext.read.format(‘parquet').load(‘mytableFiles’).registerAsTable(‘myTable’)
  *   50 seconds on ORC using  
sqlContext.read.format(‘orc').load(‘mytableFiles’).registerAsTable(‘myTable’)
  *   15 seconds on ORC using sqlContext(‘use myDatabase’)

The use case that I have is the second and slowest benchmark. Is there 
something I can do to speed that up?

thanks



From: Mich Talebzadeh 
<mich.talebza...@gmail.com<mailto:mich.talebza...@gmail.com>>
Date: Sunday, April 17, 2016 at 2:22 PM
To: maurin lenglart <mau...@cuberonlabs.com<mailto:mau...@cuberonlabs.com>>
Cc: "user @spark" <user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: orc vs parquet aggregation, orc is really slow

Hi Maurin,

Have you tried to create your table in Hive as parquet table? This table is 
pretty small with 100K rows.

Is Hive table bucketed at all? I gather your issue is inserting rows into Hive 
table at the moment that taking longer time (compared to Parquet)?

HTH




Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com<http://talebzadehmich.wordpress.com/>



On 17 April 2016 at 21:43, Maurin Lenglart 
<mau...@cuberonlabs.com<mailto:mau...@cuberonlabs.com>> wrote:
Hi,
I am using cloudera distribution, and when I do a" desc formatted table” I don 
t get all the table parameters.

But I did a hive orcfiledump on one random file ( I replaced some of the values 
that can be sensible) :
hive --orcfiledump 
/user/hive/warehouse/myDB.db/mytable/event_date=2016-04-01/part-1
2016-04-17 01:36:15,424 WARN  [main] mapreduce.TableMapReduceUtil: The 
hbase-prefix-tree module jar containing PrefixTreeCodec is not present.  
Continuing without it.
Structure for 
/user/hive/warehouse/myDB.db/mytable/event_date=2016-04-01/part-1
File Version: 0.12 with HIVE_8732
16/04/17 01:36:18 INFO orc.ReaderImpl: Reading ORC rows from 
/user/hive/warehouse/myDB.db/mytable/event_date=2016-04-01/part-1 with 
{include: null, offset: 0, length: 9223372036854775807}
Rows: 104260
Compression: ZLIB
Compression size: 262144
Type: struct

Stripe Statistics:
  Stripe 1:
Column 0: count: 104260 hasNull: false
Column 1: count: 104260 hasNull: false min: XXX max: XXX sum: 365456
Column 2: count: 104260 hasNull: false min: XXX max: XXX sum: 634322
Column 3: count: 104260 hasNull: false min: XXX max: XXX sum: 738629
Column 4: count: 104260 hasNull: false min: 0 max: 1 sum: 37221
Column 5: count: 104260 hasNull: false min: XXX max: Others sum: 262478
Column 6: count: 104260 hasNull: false min:  max: XXX sum: 220591
Column 7: count: 104260 hasNull: false min:  max: XXX sum: 1102288
Column 8: count: 104260 hasNull: false min:  max: XXX sum: 1657073
Column 9: count: 104260 hasNull: false min: 1. XXX max: NULL sum: 730846
Column 10: count: 104260 hasNull: false min: 0 max: 152963 sum: 5481629
Column 11: count: 104260 hasNull: false min: 0 max: 45481 sum: 625946
Column 12: count: 104260 hasNull: false min: 0.0 max: 40220.0 sum: 324522.0
Column 13: count: 104260 hasNull: false min: 0.0 max: 201100.0 sum: 
6958348.122699987
Column 14: count: 104260 hasNull: false min: -2273.0 max: 39930.13977860418 
sum: 1546639.6964531767
Column 15: count: 104260 hasNull: false min: 0 max: 39269 sum: 233814
Column 16: count: 104260 hasNull: false min: 0.0 max: 4824.029119913681 
sum: 45711.881143417035
Column 17: count: 104260 hasNull: false min: 0 max: 46883 sum: 437866
Column 18: count: 104260 hasNull: false min: 0 max: 14402 sum: 33864
Column 19: count: 104260 hasNull: false min: 2016-04-03 max: 2016-04-03 
sum: 1042600

File Statistics:
  Column 0: count: 104260 hasNull: false
  Column 1: count: 104260 hasNull: false min: XXX max: XXX sum: 365456
  Column 2: count: 104260 hasNull: false min: XXX max: XXX sum: 634322
  Column 3: count: 104260 hasNull: false min: XXX max: Unknown Utm sum: 738629
  Column 4: count: 104260 hasNull: false min: 0 max: 1 sum: 37221
  Column 5: count: 104260 hasNull: false min: XXX max: Others sum: 262478
  Column 6: count: 104260 hasNull: false min:  max: XXX sum: 220591
  Column 7: count: 104260 hasNull: false min:  max: XXX sum: 1102288
  Column 8: count: 104260 hasNull: false min:  max: Travel sum: 1657073
  Column 9: count: 104260 hasNull: false min: 1. XXX max: NULL sum: 730846
  Column 10: count: 104260 hasNull: false mi

Re: orc vs parquet aggregation, orc is really slow

2016-04-17 Thread Maurin Lenglart
start: 2674 length 191
Stream: column 16 section ROW_INDEX start: 2865 length 295
Stream: column 17 section ROW_INDEX start: 3160 length 194
Stream: column 18 section ROW_INDEX start: 3354 length 192
Stream: column 19 section ROW_INDEX start: 3546 length 122
Stream: column 1 section DATA start: 3668 length 33586
Stream: column 1 section LENGTH start: 37254 length 7
Stream: column 1 section DICTIONARY_DATA start: 37261 length 14
Stream: column 2 section DATA start: 37275 length 40616
Stream: column 2 section LENGTH start: 77891 length 8
Stream: column 2 section DICTIONARY_DATA start: 77899 length 32
Stream: column 3 section DATA start: 77931 length 46120
Stream: column 3 section LENGTH start: 124051 length 17
Stream: column 3 section DICTIONARY_DATA start: 124068 length 99
Stream: column 4 section DATA start: 124167 length 26498
Stream: column 5 section DATA start: 150665 length 38409
Stream: column 5 section LENGTH start: 189074 length 8
Stream: column 5 section DICTIONARY_DATA start: 189082 length 30
Stream: column 6 section DATA start: 189112 length 9425
Stream: column 6 section LENGTH start: 198537 length 9
Stream: column 6 section DICTIONARY_DATA start: 198546 length 36
Stream: column 7 section DATA start: 198582 length 95465
Stream: column 7 section LENGTH start: 294047 length 127
Stream: column 7 section DICTIONARY_DATA start: 294174 length 1130
Stream: column 8 section DATA start: 295304 length 43896
Stream: column 8 section LENGTH start: 339200 length 16
Stream: column 8 section DICTIONARY_DATA start: 339216 length 107
Stream: column 9 section DATA start: 339323 length 42274
Stream: column 9 section LENGTH start: 381597 length 9
Stream: column 9 section DICTIONARY_DATA start: 381606 length 55
Stream: column 10 section DATA start: 381661 length 89206
Stream: column 11 section DATA start: 470867 length 46613
Stream: column 12 section DATA start: 517480 length 62630
Stream: column 13 section DATA start: 580110 length 103241
Stream: column 14 section DATA start: 683351 length 138479
Stream: column 15 section DATA start: 821830 length 34983
Stream: column 16 section DATA start: 856813 length 7917
Stream: column 17 section DATA start: 864730 length 47435
Stream: column 18 section DATA start: 912165 length 462
Stream: column 19 section DATA start: 912627 length 140
Stream: column 19 section LENGTH start: 912767 length 6
Stream: column 19 section DICTIONARY_DATA start: 912773 length 13
Encoding column 0: DIRECT
Encoding column 1: DICTIONARY_V2[3]
Encoding column 2: DICTIONARY_V2[5]
Encoding column 3: DICTIONARY_V2[12]
Encoding column 4: DIRECT_V2
Encoding column 5: DICTIONARY_V2[6]
Encoding column 6: DICTIONARY_V2[8]
Encoding column 7: DICTIONARY_V2[175]
Encoding column 8: DICTIONARY_V2[11]
Encoding column 9: DICTIONARY_V2[7]
Encoding column 10: DIRECT_V2
Encoding column 11: DIRECT_V2
Encoding column 12: DIRECT
Encoding column 13: DIRECT
Encoding column 14: DIRECT
Encoding column 15: DIRECT_V2
Encoding column 16: DIRECT
Encoding column 17: DIRECT_V2
Encoding column 18: DIRECT_V2
Encoding column 19: DICTIONARY_V2[1]

File length: 914171 bytes
Padding length: 0 bytes
Padding ratio: 0%


I also noticed something :
If I load a table like that : 
sqlContext.read.format(‘orc').load(‘mytableFiles’).registerAsTable(‘myTable’)
The queries done on myTable take at least twice the amount of time compared to 
queries done on the table loaded with hive directly.
For technical reasons my pipeline is not fully migrated to use hive tables, and 
in a lot of place I still manually load the table and register it.
I only see that problem with ORC format.
Do you see a solution so I could have the same performance on loaded tables?

thanks





From: Mich Talebzadeh 
<mich.talebza...@gmail.com<mailto:mich.talebza...@gmail.com>>
Date: Saturday, April 16, 2016 at 4:14 AM
To: maurin lenglart <mau...@cuberonlabs.com<mailto:mau...@cuberonlabs.com>>, 
"user @spark" <user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: orc vs parquet aggregation, orc is really slow

Apologies that should read

desc formatted 

Example for table dummy

hive> desc formatted  dummy;
OK
# col_name  data_type   comment
id  int
clustered   int
scattered   int
randomised  int
random_string   varchar(50)
small_vcvarchar(10)
padding varchar(10)
# Detailed Table Information
Database:   test
Owner:  hduser
CreateTime: Sun Jan 31 06:09:56 GMT 2016
LastAccessTime: UNKNOWN
Retention:  0
Location:   hdfs://rhes564:9000/user/hive/warehouse/test.db/dummy

Re: orc vs parquet aggregation, orc is really slow

2016-04-16 Thread Maurin Lenglart
Hi,

I will put the date in the correct format in the future. And see if that change 
anything.
The query that I sent is just an exemple of one aggregation possible, I have a 
lot of them possible on the same table, so I am not sure that sorting all of 
them could actually have an impact.

I am using the latest release of cloudera and I didn’t modified any version. Do 
you think that I should try to manually update hive ?

thanks

From: Jörn Franke <jornfra...@gmail.com<mailto:jornfra...@gmail.com>>
Date: Saturday, April 16, 2016 at 1:02 AM
To: maurin lenglart <mau...@cuberonlabs.com<mailto:mau...@cuberonlabs.com>>
Cc: "user @spark" <user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: orc vs parquet aggregation, orc is really slow


Generally a recommendation (besides the issue) - Do not put dates as String. I 
recommend here to make them ints. It will be in both cases much faster.

It could be that you load them differently in the tables. Generally for these 
tables you should insert them in both cases sorted into the tables.
It could be also that in one case you compress the file and in the other not. 
It is always a good practice to have all options in the create table statement 
- even the default ones.

Hive seems a little bit outdated. Do you use Spark as an execution engine? Then 
you should upgrade to newer versions of Hive. The Spark execution engine on 
hive is still a little bit more experimental than TEZ. Depends also which 
distribution you are using.

Normally I would expect both of them to perform similarly.

On 16 Apr 2016, at 09:20, Maurin Lenglart 
<mau...@cuberonlabs.com<mailto:mau...@cuberonlabs.com>> wrote:

Hi,
I am executing one query :
“SELECT `event_date` as `event_date`,sum(`bookings`) as 
`bookings`,sum(`dealviews`) as `dealviews` FROM myTable WHERE  `event_date` >= 
'2016-01-06' AND `event_date` <= '2016-04-02' GROUP BY `event_date` LIMIT 2”

My table was created something like :
  CREATE TABLE myTable (
  bookingsDOUBLE
  , deal views  INT
  )
   STORED AS ORC or PARQUET
 PARTITION BY (event_date STRING)

PARQUET take 9second of cumulative CPU
ORC take 50second of cumulative CPU.

For ORC I have tried to 
hiveContext.setConf(“Spark.Sql.Orc.FilterPushdown”,“true”)
But it didn’t change anything

I am missing something, or parquet is better for this type of query?

I am using spark 1.6.0 with hive 1.1.0

thanks




Re: orc vs parquet aggregation, orc is really slow

2016-04-16 Thread Maurin Lenglart
Hi,
I have : 17970737 rows
I tried to do a “desc formatted statistics myTable” but I get “Error while 
compiling statement: FAILED: SemanticException [Error 10001]: Table not found 
statistics”
Even after doing something like : “ANALYZE TABLE myTable COMPUTE STATISTICS FOR 
COLUMNS"

Thank you for your answer.

From: Mich Talebzadeh 
<mich.talebza...@gmail.com<mailto:mich.talebza...@gmail.com>>
Date: Saturday, April 16, 2016 at 12:32 AM
To: maurin lenglart <mau...@cuberonlabs.com<mailto:mau...@cuberonlabs.com>>
Cc: "user @spark" <user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: orc vs parquet aggregation, orc is really slow

Have you analysed statistics on the ORC table? How many rows are there?

Also send the outp of

desc formatted statistics 

HTH


Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com<http://talebzadehmich.wordpress.com/>



On 16 April 2016 at 08:20, Maurin Lenglart 
<mau...@cuberonlabs.com<mailto:mau...@cuberonlabs.com>> wrote:
Hi,
I am executing one query :
“SELECT `event_date` as `event_date`,sum(`bookings`) as 
`bookings`,sum(`dealviews`) as `dealviews` FROM myTable WHERE  `event_date` >= 
'2016-01-06' AND `event_date` <= '2016-04-02' GROUP BY `event_date` LIMIT 2”

My table was created something like :
  CREATE TABLE myTable (
  bookingsDOUBLE
  , deal views  INT
  )
   STORED AS ORC or PARQUET
 PARTITION BY (event_date STRING)

PARQUET take 9second of cumulative CPU
ORC take 50second of cumulative CPU.

For ORC I have tried to 
hiveContext.setConf(“Spark.Sql.Orc.FilterPushdown”,“true”)
But it didn’t change anything

I am missing something, or parquet is better for this type of query?

I am using spark 1.6.0 with hive 1.1.0

thanks





orc vs parquet aggregation, orc is really slow

2016-04-16 Thread Maurin Lenglart
Hi,
I am executing one query :
“SELECT `event_date` as `event_date`,sum(`bookings`) as 
`bookings`,sum(`dealviews`) as `dealviews` FROM myTable WHERE  `event_date` >= 
'2016-01-06' AND `event_date` <= '2016-04-02' GROUP BY `event_date` LIMIT 2”

My table was created something like :
  CREATE TABLE myTable (
  bookingsDOUBLE
  , deal views  INT
  )
   STORED AS ORC or PARQUET
 PARTITION BY (event_date STRING)

PARQUET take 9second of cumulative CPU
ORC take 50second of cumulative CPU.

For ORC I have tried to 
hiveContext.setConf(“Spark.Sql.Orc.FilterPushdown”,“true”)
But it didn’t change anything

I am missing something, or parquet is better for this type of query?

I am using spark 1.6.0 with hive 1.1.0

thanks




Re: alter table add columns aternatives or hive refresh

2016-04-15 Thread Maurin Lenglart
Hi,

Following your answer I  was able to make it work.
FIY:
Basically the solution is to manually create the table in hive using a sql 
“Create table” command.
When doing  a saveAsTable, hive meta-store don’t get the info of the df.
So now my flow is :

  *   Create a dataframe
  *   if it is the first time I see the table, I generate a CREATE TABLE using 
the DF.schema.fields.
  *   If it is not:
 *   I do a diff of my df schema and myTable schema
 *   I do a sql "Alter table add columns” for the table
 *   Use a df.withColumn for each column that are missing in the df
  *   Then I use df.insertInto myTable

I also migrated for parquet to ORC, not sure if this have an impact or not.

Thanks you for our help.

From: Mich Talebzadeh 
<mich.talebza...@gmail.com<mailto:mich.talebza...@gmail.com>>
Date: Sunday, April 10, 2016 at 11:54 PM
To: maurin lenglart <mau...@cuberonlabs.com<mailto:mau...@cuberonlabs.com>>
Cc: "user @spark" <user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: alter table add columns aternatives or hive refresh

This should work. Make sure that you use HiveContext.sql and sqlContext 
correctly

This is an example in Spark, reading a CSV file, doing some manipulation, 
creating a temp table, saving data as ORC file, adding another column and 
inserting values to table in Hive with default values for new rows

import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
//
  val conf = new SparkConf().
   setAppName("ImportCSV").
   setMaster("local[12]").
   set("spark.driver.allowMultipleContexts", "true").
   set("spark.hadoop.validateOutputSpecs", "false")
  val sc = new SparkContext(conf)
  // Create sqlContext based on HiveContext
  val sqlContext = new HiveContext(sc)
  import sqlContext.implicits._
  val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
  //
  // Get a DF first based on Databricks CSV libraries
  //
  val df = 
HiveContext.read.format("com.databricks.spark.csv").option("inferSchema", 
"true").option("header", "true").load("/data/stg/table2")
  //
  // Next filter out empty rows (last colum has to be > "" and get rid of "?" 
special character. Also get rid of "," in money fields
  // Example csv cell £2,500.00 --> need to transform to plain 2500.00
  //
  val a = df.
  filter(col("Total") > "").
  map(x => (x.getString(0),x.getString(1), 
x.getString(2).substring(1).replace(",", "").toDouble, 
x.getString(3).substring(1).replace(",", "").toDouble, 
x.getString(4).substring(1).replace(",", "").toDouble))
   //
   // convert this RDD to DF and create a Spark temporary table
   //
   a.toDF.registerTempTable("tmp")
  //
  // Need to create and populate target ORC table t3 in database test in Hive
  //
  HiveContext.sql("use test")
  HiveContext.sql("DROP TABLE IF EXISTS test.t3")
  var sqltext : String = ""
  sqltext = """
  CREATE TABLE test.t3 (
   INVOICENUMBER  String
  ,PAYMENTDATEString
  ,NETDOUBLE
  ,VATDOUBLE
  ,TOTAL  DOUBLE
  )
  COMMENT 'from csv file from excel sheet'
  STORED AS ORC
  TBLPROPERTIES ( "orc.compress"="ZLIB" )
  """
  HiveContext.sql(sqltext)
  // Note you can only see Spark temporary table in sqlContext NOT HiveContext
  val results = sqlContext.sql("SELECT * FROM tmp")
  // clean up the file in HDFS directory first if exists
  val hadoopConf = new org.apache.hadoop.conf.Configuration()
  val hdfs = org.apache.hadoop.fs.FileSystem.get(new 
java.net.URI("hdfs://rhes564:9000"), hadoopConf)
  val output = "hdfs://rhes564:9000/user/hive/warehouse/test.db/t3"   // The 
path for Hive table just created
  try { hdfs.delete(new org.apache.hadoop.fs.Path(output), true) } catch { case 
_ : Throwable => { } }

  results.write.format("orc").save(output)
//
  sqlContext.sql("ALTER TABLE test.t3 ADD COLUMNS (new_col VARCHAR(30))")
  sqlContext.sql("INSERT INTO test.t3 SELECT *, 'London' FROM tmp")
  HiveContext.sql("SELECT * FROM test.t3 ORDER BY 1").collect.foreach(println)

HTH



Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com<http://talebzadehmich.wordpress.com/>



On 11 April 2016 at 01:36, Maurin Lenglart 
<mau...@cuberonlabs.com<mailto:mau...@cuber

Re: alter table add columns aternatives or hive refresh

2016-04-11 Thread Maurin Lenglart
I will try that during the next w-e.

Thanks you for your answers.


From: Mich Talebzadeh 
<mich.talebza...@gmail.com<mailto:mich.talebza...@gmail.com>>
Date: Sunday, April 10, 2016 at 11:54 PM
To: maurin lenglart <mau...@cuberonlabs.com<mailto:mau...@cuberonlabs.com>>
Cc: "user @spark" <user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: alter table add columns aternatives or hive refresh

This should work. Make sure that you use HiveContext.sql and sqlContext 
correctly

This is an example in Spark, reading a CSV file, doing some manipulation, 
creating a temp table, saving data as ORC file, adding another column and 
inserting values to table in Hive with default values for new rows

import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
//
  val conf = new SparkConf().
   setAppName("ImportCSV").
   setMaster("local[12]").
   set("spark.driver.allowMultipleContexts", "true").
   set("spark.hadoop.validateOutputSpecs", "false")
  val sc = new SparkContext(conf)
  // Create sqlContext based on HiveContext
  val sqlContext = new HiveContext(sc)
  import sqlContext.implicits._
  val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
  //
  // Get a DF first based on Databricks CSV libraries
  //
  val df = 
HiveContext.read.format("com.databricks.spark.csv").option("inferSchema", 
"true").option("header", "true").load("/data/stg/table2")
  //
  // Next filter out empty rows (last colum has to be > "" and get rid of "?" 
special character. Also get rid of "," in money fields
  // Example csv cell £2,500.00 --> need to transform to plain 2500.00
  //
  val a = df.
  filter(col("Total") > "").
  map(x => (x.getString(0),x.getString(1), 
x.getString(2).substring(1).replace(",", "").toDouble, 
x.getString(3).substring(1).replace(",", "").toDouble, 
x.getString(4).substring(1).replace(",", "").toDouble))
   //
   // convert this RDD to DF and create a Spark temporary table
   //
   a.toDF.registerTempTable("tmp")
  //
  // Need to create and populate target ORC table t3 in database test in Hive
  //
  HiveContext.sql("use test")
  HiveContext.sql("DROP TABLE IF EXISTS test.t3")
  var sqltext : String = ""
  sqltext = """
  CREATE TABLE test.t3 (
   INVOICENUMBER  String
  ,PAYMENTDATEString
  ,NETDOUBLE
  ,VATDOUBLE
  ,TOTAL  DOUBLE
  )
  COMMENT 'from csv file from excel sheet'
  STORED AS ORC
  TBLPROPERTIES ( "orc.compress"="ZLIB" )
  """
  HiveContext.sql(sqltext)
  // Note you can only see Spark temporary table in sqlContext NOT HiveContext
  val results = sqlContext.sql("SELECT * FROM tmp")
  // clean up the file in HDFS directory first if exists
  val hadoopConf = new org.apache.hadoop.conf.Configuration()
  val hdfs = org.apache.hadoop.fs.FileSystem.get(new 
java.net.URI("hdfs://rhes564:9000"), hadoopConf)
  val output = "hdfs://rhes564:9000/user/hive/warehouse/test.db/t3"   // The 
path for Hive table just created
  try { hdfs.delete(new org.apache.hadoop.fs.Path(output), true) } catch { case 
_ : Throwable => { } }

  results.write.format("orc").save(output)
//
  sqlContext.sql("ALTER TABLE test.t3 ADD COLUMNS (new_col VARCHAR(30))")
  sqlContext.sql("INSERT INTO test.t3 SELECT *, 'London' FROM tmp")
  HiveContext.sql("SELECT * FROM test.t3 ORDER BY 1").collect.foreach(println)

HTH



Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com<http://talebzadehmich.wordpress.com/>



On 11 April 2016 at 01:36, Maurin Lenglart 
<mau...@cuberonlabs.com<mailto:mau...@cuberonlabs.com>> wrote:
Your solution works in hive, but not in spark, even if I use hive context.
I tried to create a temp table and then this query:
 - sqlContext.sql("insert into table myTable select * from myTable_temp”)
But I still get the same error.

thanks

From: Mich Talebzadeh 
<mich.talebza...@gmail.com<mailto:mich.talebza...@gmail.com>>
Date: Sunday, April 10, 2016 at 12:25 PM
To: "user @spark" <user@spark.apache.org<mailto:user@spark.apache.org>>

Subject: Re: alter table add columns aternatives or hive refresh

Hi,

I am confining myself to Hive tables. As I stated it before I have not tried it 
in Spark. So

Re: alter table add columns aternatives or hive refresh

2016-04-10 Thread Maurin Lenglart
Your solution works in hive, but not in spark, even if I use hive context.
I tried to create a temp table and then this query:
 - sqlContext.sql("insert into table myTable select * from myTable_temp”)
But I still get the same error.

thanks

From: Mich Talebzadeh 
<mich.talebza...@gmail.com<mailto:mich.talebza...@gmail.com>>
Date: Sunday, April 10, 2016 at 12:25 PM
To: "user @spark" <user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: alter table add columns aternatives or hive refresh

Hi,

I am confining myself to Hive tables. As I stated it before I have not tried it 
in Spark. So I stand corrected.

Let us try this simple test in Hive


-- Create table
hive> create table testme(col1 int);
OK
--insert a row
hive> insert into testme values(1);

Loading data to table test.testme
OK
-- Add a new column to testme
hive> alter table testme add columns (new_col varchar(30));
OK
Time taken: 0.055 seconds

-- Expect one row here

hive> select * from testme;
OK
1   NULL
-- Add a new row including values for new_col. This should work
hive> insert into testme values(1,'London');
Loading data to table test.testme
OK
hive> select * from testme;
OK
1   NULL
1   London
Time taken: 0.074 seconds, Fetched: 2 row(s)
-- Now update the new column
hive> update testme set col2 = 'NY';
FAILED: SemanticException [Error 10297]: Attempt to do update or delete on 
table test.testme that does not use an AcidOutputFormat or is not bucketed

So this is Hive. You can add new rows including values for the new column but 
cannot update the null values. Will this work for you?

HTH


Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com<http://talebzadehmich.wordpress.com/>



On 10 April 2016 at 19:34, Maurin Lenglart 
<mau...@cuberonlabs.com<mailto:mau...@cuberonlabs.com>> wrote:
Hi,
So basically you are telling me that I need to recreate a table, and re-insert 
everything every time  I update a column?
I understand the constraints, but that solution doesn’t look good to me. I am 
updating the schema everyday and the table is a couple of TB of data.

Do you see any other options that will allow me not to move TB of data everyday?

Thanks for you answer

From: Mich Talebzadeh 
<mich.talebza...@gmail.com<mailto:mich.talebza...@gmail.com>>
Date: Sunday, April 10, 2016 at 3:41 AM
To: maurin lenglart <mau...@cuberonlabs.com<mailto:mau...@cuberonlabs.com>>
Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" 
<user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: alter table add columns aternatives or hive refresh

I have not tried it on Spark but the column added in Hive to an existing table 
cannot be updated for existing rows. In other words the new column is set to 
null which does not require the change in the existing file length.

So basically as I understand when a  column is added to an already table.

1.The metadata for the underlying table will be updated
2.The new column will by default have null value
3.The existing rows cannot have new column updated to a non null value
4.New rows can have non null values set for the new column
5.No sql operation can be done on that column. For example select * from 
 where new_column IS NOT NULL
6.The easiest option is to create a new table with the new column and do 
insert/select from the existing table with values set for the new column

HTH


Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com<http://talebzadehmich.wordpress.com/>



On 10 April 2016 at 05:06, Maurin Lenglart 
<mau...@cuberonlabs.com<mailto:mau...@cuberonlabs.com>> wrote:
Hi,
I am trying to add columns to table that I created with the “saveAsTable” api.
I update the columns using sqlContext.sql(‘alter table myTable add columns 
(mycol string)’).
The next time I create a df and save it in the same table, with the new columns 
I get a :
“ParquetRelation
 requires that the query in the SELECT clause of the INSERT INTO/OVERWRITE 
statement generates the same number of columns as its schema.”

Also thise two commands don t return the same columns :
1. sqlContext.table(‘myTable’).schema.fields<— wrong result
2. sqlContext.sql(’show columns in mytable’)  <—— good results

It seems to be a known bug : https://issues.apache.org/jira/browse/SPARK-9764 
(see related bugs)

But I am wondering, how else can I update the columns or make sure that spark 
take the new columns?

I already tried to refreshTable and to restart spark.

thanks





Re: alter table add columns aternatives or hive refresh

2016-04-10 Thread Maurin Lenglart
Hi,
So basically you are telling me that I need to recreate a table, and re-insert 
everything every time  I update a column?
I understand the constraints, but that solution doesn’t look good to me. I am 
updating the schema everyday and the table is a couple of TB of data.

Do you see any other options that will allow me not to move TB of data everyday?

Thanks for you answer

From: Mich Talebzadeh 
<mich.talebza...@gmail.com<mailto:mich.talebza...@gmail.com>>
Date: Sunday, April 10, 2016 at 3:41 AM
To: maurin lenglart <mau...@cuberonlabs.com<mailto:mau...@cuberonlabs.com>>
Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" 
<user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: alter table add columns aternatives or hive refresh

I have not tried it on Spark but the column added in Hive to an existing table 
cannot be updated for existing rows. In other words the new column is set to 
null which does not require the change in the existing file length.

So basically as I understand when a  column is added to an already table.

1.The metadata for the underlying table will be updated
2.The new column will by default have null value
3.The existing rows cannot have new column updated to a non null value
4.New rows can have non null values set for the new column
5.No sql operation can be done on that column. For example select * from 
 where new_column IS NOT NULL
6.The easiest option is to create a new table with the new column and do 
insert/select from the existing table with values set for the new column

HTH


Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com<http://talebzadehmich.wordpress.com/>



On 10 April 2016 at 05:06, Maurin Lenglart 
<mau...@cuberonlabs.com<mailto:mau...@cuberonlabs.com>> wrote:
Hi,
I am trying to add columns to table that I created with the “saveAsTable” api.
I update the columns using sqlContext.sql(‘alter table myTable add columns 
(mycol string)’).
The next time I create a df and save it in the same table, with the new columns 
I get a :
“ParquetRelation
 requires that the query in the SELECT clause of the INSERT INTO/OVERWRITE 
statement generates the same number of columns as its schema.”

Also thise two commands don t return the same columns :
1. sqlContext.table(‘myTable’).schema.fields<— wrong result
2. sqlContext.sql(’show columns in mytable’)  <—— good results

It seems to be a known bug : https://issues.apache.org/jira/browse/SPARK-9764 
(see related bugs)

But I am wondering, how else can I update the columns or make sure that spark 
take the new columns?

I already tried to refreshTable and to restart spark.

thanks




alter table add columns aternatives or hive refresh

2016-04-09 Thread Maurin Lenglart
Hi,
I am trying to add columns to table that I created with the “saveAsTable” api.
I update the columns using sqlContext.sql(‘alter table myTable add columns 
(mycol string)’).
The next time I create a df and save it in the same table, with the new columns 
I get a :
“ParquetRelation
 requires that the query in the SELECT clause of the INSERT INTO/OVERWRITE 
statement generates the same number of columns as its schema.”

Also thise two commands don t return the same columns :
1. sqlContext.table(‘myTable’).schema.fields<— wrong result
2. sqlContext.sql(’show columns in mytable’)  <—— good results

It seems to be a known bug : https://issues.apache.org/jira/browse/SPARK-9764 
(see related bugs)

But I am wondering, how else can I update the columns or make sure that spark 
take the new columns?

I already tried to refreshTable and to restart spark.

thanks



Re: Sample sql query using pyspark

2016-03-01 Thread Maurin Lenglart
Hi,
Thanks for the hint. I have tried to remove the limit from the query but the 
result is still the same. If I understand correctly, the func "sample()” is 
taking a sample of the result of the query and not sampling the original table 
that I am querying.

I have a business use case to sample a lot of different queries in prod, so I 
can’t just insert 100 rows in an other table.

I am thinking about doing something like (pseudo code) :
my_df = SELECT * FROM my_table WHERE  `event_date` >= '2015-11-14' AND 
`event_date` <= '2016-02-19’
my_sample = my_df.sample(0.1)
Result = sample.groupBy("Category").agg(sum("bookings"), sum("dealviews”))


Thanks for your answer.



From: James Barney <jamesbarne...@gmail.com<mailto:jamesbarne...@gmail.com>>
Date: Tuesday, March 1, 2016 at 7:01 AM
To: maurin lenglart <mau...@cuberonlabs.com<mailto:mau...@cuberonlabs.com>>
Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" 
<user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: Sample sql query using pyspark

Maurin,

I don't know the technical reason why but: try removing the 'limit 100' part of 
your query. I was trying to do something similar the other week and what I 
found is that each executor doesn't necessarily get the same 100 rows. Joins 
would fail or result with a bunch of nulls when keys weren't found between the 
slices of 100 rows.

Once I removed the 'limit ' part of my query, all the results were the same 
across the board and taking samples worked again.

If the amount of data is too large, or you're trying to just test on a smaller 
size, just define another table and insert only 100 rows into that table.

I hope that helps!

On Tue, Mar 1, 2016 at 3:10 AM, Maurin Lenglart 
<mau...@cuberonlabs.com<mailto:mau...@cuberonlabs.com>> wrote:
Hi,
I am trying to get a sample of a sql query in to make the query run faster.
My query look like this :
SELECT `Category` as `Category`,sum(`bookings`) as `bookings`,sum(`dealviews`) 
as `dealviews` FROM groupon_dropbox WHERE  `event_date` >= '2015-11-14' AND 
`event_date` <= '2016-02-19' GROUP BY `Category` LIMIT 100

The table is partitioned by event_date. And the code I am using is:
 df = self.df_from_sql(sql, srcs)

results = df.sample(False, 0.5).collect()

 The results are a little bit different, but the execution time is almost the 
same. Am I missing something?


thanks



Sample sql query using pyspark

2016-03-01 Thread Maurin Lenglart
Hi,
I am trying to get a sample of a sql query in to make the query run faster.
My query look like this :
SELECT `Category` as `Category`,sum(`bookings`) as `bookings`,sum(`dealviews`) 
as `dealviews` FROM groupon_dropbox WHERE  `event_date` >= '2015-11-14' AND 
`event_date` <= '2016-02-19' GROUP BY `Category` LIMIT 100

The table is partitioned by event_date. And the code I am using is:
 df = self.df_from_sql(sql, srcs)

results = df.sample(False, 0.5).collect()

 The results are a little bit different, but the execution time is almost the 
same. Am I missing something?


thanks


_metada file throwing an "GC overhead limit exceeded" after a write

2016-02-12 Thread Maurin Lenglart
Hi,

I am currently using spark in python. I have my master, worker and driver on 
the same machine in different dockers. I am using spark 1.6.
The configuration that I am using look like this :

CONFIG["spark.executor.memory"] = "100g"
CONFIG["spark.executor.cores"] = "11"
CONFIG["spark.cores.max"] = "11"
CONFIG["spark.scheduler.mode"] = "FAIR"
CONFIG["spark.default.parallelism"] = “60"

I am doing a sql query and writing the result in one partitioned table.The code 
look like this :

df = self.sqlContext.sql(selectsql)
parquet_dir = self.dir_for_table(tablename)
df.write.partitionBy(partition_name).mode(mode).parquet(parquet_dir)

The code works and my partition get created correctly. But it is always 
throwing an exception (see bellow).

What make me thinks that it is a problem wit the _metadata file is because the 
exception is thrown after the partition is created. And when I do a ls –ltr on 
my folder the _metadata file is the last one that get modified and the size is 
zero.

Any ideas why?

Thanks


The exception:


16/02/12 14:08:21 INFO ParseDriver: Parse Completed
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
details.
Exception in thread "qtp1919278883-98" java.lang.OutOfMemoryError: GC overhead 
limit exceeded
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.addWaiter(AbstractQueuedSynchronizer.java:606)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1197)
at 
java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:214)
at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:290)
at 
org.spark-project.jetty.util.BlockingArrayQueue.poll(BlockingArrayQueue.java:247)
at 
org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:544)
at java.lang.Thread.run(Thread.java:745)
An error occurred while calling o57.parquet.
: org.apache.spark.SparkException: Job aborted.
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:156)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58)
at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56)
at 
org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
at 
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
at 
org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:256)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139)
at 
org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:329)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
at 
org.apache.parquet.format.converter.ParquetMetadataConverter.addRowGroup(ParquetMetadataConverter.java:154)
at