coalesce executor memory explosion

2016-02-24 Thread Christopher Brady
Short: Why does coalesce use huge amounts of memory? How does it work 
internally?


Long version:
I asked a similar question a few weeks ago, but I have a simpler test 
with better numbers now. I have an RDD created from some HDFS files. I 
want to sample it and then coalesce it into fewer partitions. For some 
reason coalesce uses huge amounts of memory. From what I've read, 
coalesce does not require full partitions to be in memory at once, so I 
don't understand what's causing this. Can anyone explain to me why 
coalesce needs so much memory? Are there any rules for determining the 
best number of partitions to coalesce into?


Spark version:
1.5.0

Test data:
241 GB of compress parquet files

Executors:
27 executors
16 GB memory each
3 cores each

In my tests I'm reading the data from HDFS, sampling it, coalescing into 
fewer partitions, and then doing a count just to have an action.


Without coalesce there is no memory issue. The size of the data makes no 
difference:
hadoopFile (creates 14,844 partitions) -> sample (fraction 0.00075) -> 
count()

Per executor memory usage: 0.4 GB

Adding coalesce increases the memory usage substantially and it is still 
using more partitions than I'd like:
hadoopFile (creates 14,844 partitions) -> sample (fraction 0.00075) -> 
coalesce (to 668 partitions) -> count()

Per executor memory usage: 3.1 GB

Going down to 201 partitions uses most of the available memory just for 
the coalesce:
hadoopFile (creates 14,844 partitions) -> sample (fraction 0.00075) -> 
coalesce (to 201 partitions) -> count()

Per executor memory usage: 9.8 GB

Any number of partitions smaller than this will crash all the executors 
with out of memory. I don't really understand what is happening in 
Spark. That sample size should result in partitions smaller than the 
original partitions.


I've gone through the Spark documentation, youtube videos, and the 
Learning Spark book, but I haven't seen anything about this. Thanks.


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: coalesce and executor memory

2016-02-14 Thread Christopher Brady


I tried it without the cache, but it didn't change anything. The reason for the 
cache is that other actions will be performed on this RDD, even though it never 
gets that far. 

I can make it work by just increasing the number of partitions, but I was 
hoping to get a better understanding of how Spark works rather that just use 
trial and error every time I hit this issue. 


- Original Message - 
From: silvio.fior...@granturing.com 
To: christopher.br...@oracle.com, ko...@tresata.com 
Cc: user@spark.apache.org 
Sent: Sunday, February 14, 2016 8:27:09 AM GMT -05:00 US/Canada Eastern 
Subject: RE: coalesce and executor memory 





Actually, rereading your email I see you're caching. But ‘cache’ uses 
MEMORY_ONLY. Do you see errors about losing partitions as your job is running? 



Are you sure you need to cache if you're just saving to disk? Can you try the 
coalesce without cache? 






From: Christopher Brady 
Sent: Friday, February 12, 2016 8:34 PM 
To: Koert Kuipers ; Silvio Fiorito 
Cc: user 
Subject: Re: coalesce and executor memory 


Thank you for the responses. The map function just changes the format of the 
record slightly, so I don't think that would be the cause of the memory 
problem. 

So if I have 3 cores per executor, I need to be able to fit 3 partitions per 
executor within whatever I specify for the executor memory? Is there a way I 
can programmatically find a number of partitions I can coalesce down to without 
running out of memory? Is there some documentation where this is explained? 



On 02/12/2016 05:10 PM, Koert Kuipers wrote: 




in spark, every partition needs to fit in the memory available to the core 
processing it. 

as you coalesce you reduce number of partitions, increasing partition size. at 
some point the partition no longer fits in memory. 



On Fri, Feb 12, 2016 at 4:50 PM, Silvio Fiorito < silvio.fior...@granturing.com 
> wrote: 


Coalesce essentially reduces parallelism, so fewer cores are getting more 
records. Be aware that it could also lead to loss of data locality, depending 
on how far you reduce. Depending on what you’re doing in the map operation, it 
could lead to OOM errors. Can you give more details as to what the code for the 
map looks like? 






On 2/12/16, 1:13 PM, "Christopher Brady" < christopher.br...@oracle.com > 
wrote: 

>Can anyone help me understand why using coalesce causes my executors to 
>crash with out of memory? What happens during coalesce that increases 
>memory usage so much? 
> 
>If I do: 
>hadoopFile -> sample -> cache -> map -> saveAsNewAPIHadoopFile 
> 
>everything works fine, but if I do: 
>hadoopFile -> sample -> coalesce -> cache -> map -> saveAsNewAPIHadoopFile 
> 
>my executors crash with out of memory exceptions. 
> 
>Is there any documentation that explains what causes the increased 
>memory requirements with coalesce? It seems to be less of a problem if I 
>coalesce into a larger number of partitions, but I'm not sure why this 
>is. How would I estimate how much additional memory the coalesce requires? 
> 
>Thanks. 
> 
>- 
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
>For additional commands, e-mail: user-h...@spark.apache.org 
> 




Re: coalesce and executor memory

2016-02-12 Thread Christopher Brady
Thank you for the responses. The map function just changes the format of 
the record slightly, so I don't think that would be the cause of the 
memory problem.


So if I have 3 cores per executor, I need to be able to fit 3 partitions 
per executor within whatever I specify for the executor memory? Is there 
a way I can programmatically find a number of partitions I can coalesce 
down to without running out of memory? Is there some documentation where 
this is explained?



On 02/12/2016 05:10 PM, Koert Kuipers wrote:
in spark, every partition needs to fit in the memory available to the 
core processing it.


as you coalesce you reduce number of partitions, increasing partition 
size. at some point the partition no longer fits in memory.


On Fri, Feb 12, 2016 at 4:50 PM, Silvio Fiorito 
mailto:silvio.fior...@granturing.com>> 
wrote:


Coalesce essentially reduces parallelism, so fewer cores are
getting more records. Be aware that it could also lead to loss of
data locality, depending on how far you reduce. Depending on what
you’re doing in the map operation, it could lead to OOM errors.
Can you give more details as to what the code for the map looks like?




On 2/12/16, 1:13 PM, "Christopher Brady"
mailto:christopher.br...@oracle.com>> wrote:

>Can anyone help me understand why using coalesce causes my
executors to
>crash with out of memory? What happens during coalesce that increases
>memory usage so much?
>
>If I do:
>hadoopFile -> sample -> cache -> map -> saveAsNewAPIHadoopFile
>
>everything works fine, but if I do:
>hadoopFile -> sample -> coalesce -> cache -> map ->
saveAsNewAPIHadoopFile
>
>my executors crash with out of memory exceptions.
>
>Is there any documentation that explains what causes the increased
>memory requirements with coalesce? It seems to be less of a
problem if I
>coalesce into a larger number of partitions, but I'm not sure why
this
>is. How would I estimate how much additional memory the coalesce
requires?
>
>Thanks.
>
>-
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
<mailto:user-unsubscr...@spark.apache.org>
>For additional commands, e-mail: user-h...@spark.apache.org
<mailto:user-h...@spark.apache.org>
>






coalesce and executor memory

2016-02-12 Thread Christopher Brady
Can anyone help me understand why using coalesce causes my executors to 
crash with out of memory? What happens during coalesce that increases 
memory usage so much?


If I do:
hadoopFile -> sample -> cache -> map -> saveAsNewAPIHadoopFile

everything works fine, but if I do:
hadoopFile -> sample -> coalesce -> cache -> map -> saveAsNewAPIHadoopFile

my executors crash with out of memory exceptions.

Is there any documentation that explains what causes the increased 
memory requirements with coalesce? It seems to be less of a problem if I 
coalesce into a larger number of partitions, but I'm not sure why this 
is. How would I estimate how much additional memory the coalesce requires?


Thanks.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



DataFrameWriter.format(String) is there a list of options?

2015-12-23 Thread Christopher Brady

The documentation for DataFrameWriter.format(String) says:
"Specifies the underlying output data source. Built-in options include 
"parquet", "json", etc."


What options are there other than parquet and json? From googling I 
found "com.databricks.spark.avro", but that doesn't seem to work 
correctly for me. (It always hangs at the end.) Can I use CSV? Can I use 
sequence files?


count(*) performance in Hive vs Spark DataFrames

2015-12-16 Thread Christopher Brady
I'm having an issue where count(*) returns almost immediately using 
Hive, but takes over 10 min using DataFrames. The table data is on HDFS 
in an uncompressed CSV format. How is it possible for Hive to get the 
count so fast? Is it caching this or putting it in the metastore?


Is there anything I can do to optimize the performance of this using 
DataFrames, or should I try doing just the count with Hive using JDBC?


I've tried writing this 2 ways:

try (final JavaSparkContext sc = new JavaSparkContext("yarn-cluster", 
"Test app")) {

final HiveContext sqlContext = new HiveContext(sc.sc());
DataFrame df = sqlContext.sql("SELECT count(*) FROM my_table");
df.collect();
}

try (final JavaSparkContext sc = new JavaSparkContext("yarn-cluster", 
"Test app")) {

final HiveContext sqlContext = new HiveContext(sc.sc());
DataFrame df = sqlContext.table("my_table");
df.count();
}

Thanks.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Classpath problem trying to use DataFrames

2015-12-14 Thread Christopher Brady
Thanks for the response. I lost access to my cluster over the weekend, 
so I had to wait until today to check.


All of the correct Hive jars are in classpath.txt. Also, this error 
seems to be happening in the driver rather than the executors. It's 
running in yarn-client mode, so it should use the classpath of my local 
JVM, which also contains the Hive jars. I also checked for that class 
specifically and it is there. Does Spark do anything funny on the driver 
side that would make the Hive classes on the classpath unavailable?



On 12/11/2015 11:08 PM, Harsh J wrote:
Do you have all your hive jars listed in the classpath.txt / 
SPARK_DIST_CLASSPATH env., specifically the hive-exec jar? Is the 
location of that jar also the same on all the distributed hosts?


Passing an explicit executor classpath string may also help overcome 
this (replace HIVE_BASE_DIR to the root of your hive installation):


--conf "spark.executor.extraClassPath=$HIVE_BASE_DIR/hive/lib/*"

On Sat, Dec 12, 2015 at 6:32 AM Christopher Brady 
mailto:christopher.br...@oracle.com>> 
wrote:


I'm trying to run a basic "Hello world" type example using DataFrames
with Hive in yarn-client mode. My code is:

JavaSparkContext sc = new JavaSparkContext("yarn-client", "Test app"))
HiveContext sqlContext = new HiveContext(sc.sc <http://sc.sc>());
sqlContext.sql("SELECT * FROM my_table").count();

The exception I get on the driver is:
java.lang.ClassNotFoundException:
org.apache.hadoop.hive.ql.plan.TableDesc

There are no exceptions on the executors.

That class is definitely on the classpath of the driver, and it runs
without errors in local mode. I haven't been able to find any similar
errors on google. Does anyone know what I'm doing wrong?

The full stack trace is included below:

java.lang.NoClassDefFoundError:
Lorg/apache/hadoop/hive/ql/plan/TableDesc;
 at java.lang.Class.getDeclaredFields0(Native Method)
 at java.lang.Class.privateGetDeclaredFields(Class.java:2436)
 at java.lang.Class.getDeclaredField(Class.java:1946)
 at
java.io.ObjectStreamClass.getDeclaredSUID(ObjectStreamClass.java:1659)
 at
java.io.ObjectStreamClass.access$700(ObjectStreamClass.java:72)
 at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:480)
 at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.io.ObjectStreamClass.(ObjectStreamClass.java:468)
 at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365)
 at
java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:602)
 at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)
 at
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
 at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
 at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
 at
scala.collection.immutable.$colon$colon.readObject(List.scala:362)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at

sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at

sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
 at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
 at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at

Classpath problem trying to use DataFrames

2015-12-11 Thread Christopher Brady
I'm trying to run a basic "Hello world" type example using DataFrames 
with Hive in yarn-client mode. My code is:


JavaSparkContext sc = new JavaSparkContext("yarn-client", "Test app"))
HiveContext sqlContext = new HiveContext(sc.sc());
sqlContext.sql("SELECT * FROM my_table").count();

The exception I get on the driver is:
java.lang.ClassNotFoundException: org.apache.hadoop.hive.ql.plan.TableDesc

There are no exceptions on the executors.

That class is definitely on the classpath of the driver, and it runs 
without errors in local mode. I haven't been able to find any similar 
errors on google. Does anyone know what I'm doing wrong?


The full stack trace is included below:

java.lang.NoClassDefFoundError: Lorg/apache/hadoop/hive/ql/plan/TableDesc;
at java.lang.Class.getDeclaredFields0(Native Method)
at java.lang.Class.privateGetDeclaredFields(Class.java:2436)
at java.lang.Class.getDeclaredField(Class.java:1946)
at 
java.io.ObjectStreamClass.getDeclaredSUID(ObjectStreamClass.java:1659)

at java.io.ObjectStreamClass.access$700(ObjectStreamClass.java:72)
at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:480)
at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468)
at java.security.AccessController.doPrivileged(Native Method)
at java.io.ObjectStreamClass.(ObjectStreamClass.java:468)
at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365)
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:602)
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)

at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990