coalesce executor memory explosion
Short: Why does coalesce use huge amounts of memory? How does it work internally? Long version: I asked a similar question a few weeks ago, but I have a simpler test with better numbers now. I have an RDD created from some HDFS files. I want to sample it and then coalesce it into fewer partitions. For some reason coalesce uses huge amounts of memory. From what I've read, coalesce does not require full partitions to be in memory at once, so I don't understand what's causing this. Can anyone explain to me why coalesce needs so much memory? Are there any rules for determining the best number of partitions to coalesce into? Spark version: 1.5.0 Test data: 241 GB of compress parquet files Executors: 27 executors 16 GB memory each 3 cores each In my tests I'm reading the data from HDFS, sampling it, coalescing into fewer partitions, and then doing a count just to have an action. Without coalesce there is no memory issue. The size of the data makes no difference: hadoopFile (creates 14,844 partitions) -> sample (fraction 0.00075) -> count() Per executor memory usage: 0.4 GB Adding coalesce increases the memory usage substantially and it is still using more partitions than I'd like: hadoopFile (creates 14,844 partitions) -> sample (fraction 0.00075) -> coalesce (to 668 partitions) -> count() Per executor memory usage: 3.1 GB Going down to 201 partitions uses most of the available memory just for the coalesce: hadoopFile (creates 14,844 partitions) -> sample (fraction 0.00075) -> coalesce (to 201 partitions) -> count() Per executor memory usage: 9.8 GB Any number of partitions smaller than this will crash all the executors with out of memory. I don't really understand what is happening in Spark. That sample size should result in partitions smaller than the original partitions. I've gone through the Spark documentation, youtube videos, and the Learning Spark book, but I haven't seen anything about this. Thanks. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: coalesce and executor memory
I tried it without the cache, but it didn't change anything. The reason for the cache is that other actions will be performed on this RDD, even though it never gets that far. I can make it work by just increasing the number of partitions, but I was hoping to get a better understanding of how Spark works rather that just use trial and error every time I hit this issue. - Original Message - From: silvio.fior...@granturing.com To: christopher.br...@oracle.com, ko...@tresata.com Cc: user@spark.apache.org Sent: Sunday, February 14, 2016 8:27:09 AM GMT -05:00 US/Canada Eastern Subject: RE: coalesce and executor memory Actually, rereading your email I see you're caching. But ‘cache’ uses MEMORY_ONLY. Do you see errors about losing partitions as your job is running? Are you sure you need to cache if you're just saving to disk? Can you try the coalesce without cache? From: Christopher Brady Sent: Friday, February 12, 2016 8:34 PM To: Koert Kuipers ; Silvio Fiorito Cc: user Subject: Re: coalesce and executor memory Thank you for the responses. The map function just changes the format of the record slightly, so I don't think that would be the cause of the memory problem. So if I have 3 cores per executor, I need to be able to fit 3 partitions per executor within whatever I specify for the executor memory? Is there a way I can programmatically find a number of partitions I can coalesce down to without running out of memory? Is there some documentation where this is explained? On 02/12/2016 05:10 PM, Koert Kuipers wrote: in spark, every partition needs to fit in the memory available to the core processing it. as you coalesce you reduce number of partitions, increasing partition size. at some point the partition no longer fits in memory. On Fri, Feb 12, 2016 at 4:50 PM, Silvio Fiorito < silvio.fior...@granturing.com > wrote: Coalesce essentially reduces parallelism, so fewer cores are getting more records. Be aware that it could also lead to loss of data locality, depending on how far you reduce. Depending on what you’re doing in the map operation, it could lead to OOM errors. Can you give more details as to what the code for the map looks like? On 2/12/16, 1:13 PM, "Christopher Brady" < christopher.br...@oracle.com > wrote: >Can anyone help me understand why using coalesce causes my executors to >crash with out of memory? What happens during coalesce that increases >memory usage so much? > >If I do: >hadoopFile -> sample -> cache -> map -> saveAsNewAPIHadoopFile > >everything works fine, but if I do: >hadoopFile -> sample -> coalesce -> cache -> map -> saveAsNewAPIHadoopFile > >my executors crash with out of memory exceptions. > >Is there any documentation that explains what causes the increased >memory requirements with coalesce? It seems to be less of a problem if I >coalesce into a larger number of partitions, but I'm not sure why this >is. How would I estimate how much additional memory the coalesce requires? > >Thanks. > >- >To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >For additional commands, e-mail: user-h...@spark.apache.org >
Re: coalesce and executor memory
Thank you for the responses. The map function just changes the format of the record slightly, so I don't think that would be the cause of the memory problem. So if I have 3 cores per executor, I need to be able to fit 3 partitions per executor within whatever I specify for the executor memory? Is there a way I can programmatically find a number of partitions I can coalesce down to without running out of memory? Is there some documentation where this is explained? On 02/12/2016 05:10 PM, Koert Kuipers wrote: in spark, every partition needs to fit in the memory available to the core processing it. as you coalesce you reduce number of partitions, increasing partition size. at some point the partition no longer fits in memory. On Fri, Feb 12, 2016 at 4:50 PM, Silvio Fiorito mailto:silvio.fior...@granturing.com>> wrote: Coalesce essentially reduces parallelism, so fewer cores are getting more records. Be aware that it could also lead to loss of data locality, depending on how far you reduce. Depending on what you’re doing in the map operation, it could lead to OOM errors. Can you give more details as to what the code for the map looks like? On 2/12/16, 1:13 PM, "Christopher Brady" mailto:christopher.br...@oracle.com>> wrote: >Can anyone help me understand why using coalesce causes my executors to >crash with out of memory? What happens during coalesce that increases >memory usage so much? > >If I do: >hadoopFile -> sample -> cache -> map -> saveAsNewAPIHadoopFile > >everything works fine, but if I do: >hadoopFile -> sample -> coalesce -> cache -> map -> saveAsNewAPIHadoopFile > >my executors crash with out of memory exceptions. > >Is there any documentation that explains what causes the increased >memory requirements with coalesce? It seems to be less of a problem if I >coalesce into a larger number of partitions, but I'm not sure why this >is. How would I estimate how much additional memory the coalesce requires? > >Thanks. > >- >To unsubscribe, e-mail: user-unsubscr...@spark.apache.org <mailto:user-unsubscr...@spark.apache.org> >For additional commands, e-mail: user-h...@spark.apache.org <mailto:user-h...@spark.apache.org> >
coalesce and executor memory
Can anyone help me understand why using coalesce causes my executors to crash with out of memory? What happens during coalesce that increases memory usage so much? If I do: hadoopFile -> sample -> cache -> map -> saveAsNewAPIHadoopFile everything works fine, but if I do: hadoopFile -> sample -> coalesce -> cache -> map -> saveAsNewAPIHadoopFile my executors crash with out of memory exceptions. Is there any documentation that explains what causes the increased memory requirements with coalesce? It seems to be less of a problem if I coalesce into a larger number of partitions, but I'm not sure why this is. How would I estimate how much additional memory the coalesce requires? Thanks. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
DataFrameWriter.format(String) is there a list of options?
The documentation for DataFrameWriter.format(String) says: "Specifies the underlying output data source. Built-in options include "parquet", "json", etc." What options are there other than parquet and json? From googling I found "com.databricks.spark.avro", but that doesn't seem to work correctly for me. (It always hangs at the end.) Can I use CSV? Can I use sequence files?
count(*) performance in Hive vs Spark DataFrames
I'm having an issue where count(*) returns almost immediately using Hive, but takes over 10 min using DataFrames. The table data is on HDFS in an uncompressed CSV format. How is it possible for Hive to get the count so fast? Is it caching this or putting it in the metastore? Is there anything I can do to optimize the performance of this using DataFrames, or should I try doing just the count with Hive using JDBC? I've tried writing this 2 ways: try (final JavaSparkContext sc = new JavaSparkContext("yarn-cluster", "Test app")) { final HiveContext sqlContext = new HiveContext(sc.sc()); DataFrame df = sqlContext.sql("SELECT count(*) FROM my_table"); df.collect(); } try (final JavaSparkContext sc = new JavaSparkContext("yarn-cluster", "Test app")) { final HiveContext sqlContext = new HiveContext(sc.sc()); DataFrame df = sqlContext.table("my_table"); df.count(); } Thanks. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Classpath problem trying to use DataFrames
Thanks for the response. I lost access to my cluster over the weekend, so I had to wait until today to check. All of the correct Hive jars are in classpath.txt. Also, this error seems to be happening in the driver rather than the executors. It's running in yarn-client mode, so it should use the classpath of my local JVM, which also contains the Hive jars. I also checked for that class specifically and it is there. Does Spark do anything funny on the driver side that would make the Hive classes on the classpath unavailable? On 12/11/2015 11:08 PM, Harsh J wrote: Do you have all your hive jars listed in the classpath.txt / SPARK_DIST_CLASSPATH env., specifically the hive-exec jar? Is the location of that jar also the same on all the distributed hosts? Passing an explicit executor classpath string may also help overcome this (replace HIVE_BASE_DIR to the root of your hive installation): --conf "spark.executor.extraClassPath=$HIVE_BASE_DIR/hive/lib/*" On Sat, Dec 12, 2015 at 6:32 AM Christopher Brady mailto:christopher.br...@oracle.com>> wrote: I'm trying to run a basic "Hello world" type example using DataFrames with Hive in yarn-client mode. My code is: JavaSparkContext sc = new JavaSparkContext("yarn-client", "Test app")) HiveContext sqlContext = new HiveContext(sc.sc <http://sc.sc>()); sqlContext.sql("SELECT * FROM my_table").count(); The exception I get on the driver is: java.lang.ClassNotFoundException: org.apache.hadoop.hive.ql.plan.TableDesc There are no exceptions on the executors. That class is definitely on the classpath of the driver, and it runs without errors in local mode. I haven't been able to find any similar errors on google. Does anyone know what I'm doing wrong? The full stack trace is included below: java.lang.NoClassDefFoundError: Lorg/apache/hadoop/hive/ql/plan/TableDesc; at java.lang.Class.getDeclaredFields0(Native Method) at java.lang.Class.privateGetDeclaredFields(Class.java:2436) at java.lang.Class.getDeclaredField(Class.java:1946) at java.io.ObjectStreamClass.getDeclaredSUID(ObjectStreamClass.java:1659) at java.io.ObjectStreamClass.access$700(ObjectStreamClass.java:72) at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:480) at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468) at java.security.AccessController.doPrivileged(Native Method) at java.io.ObjectStreamClass.(ObjectStreamClass.java:468) at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365) at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:602) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at
Classpath problem trying to use DataFrames
I'm trying to run a basic "Hello world" type example using DataFrames with Hive in yarn-client mode. My code is: JavaSparkContext sc = new JavaSparkContext("yarn-client", "Test app")) HiveContext sqlContext = new HiveContext(sc.sc()); sqlContext.sql("SELECT * FROM my_table").count(); The exception I get on the driver is: java.lang.ClassNotFoundException: org.apache.hadoop.hive.ql.plan.TableDesc There are no exceptions on the executors. That class is definitely on the classpath of the driver, and it runs without errors in local mode. I haven't been able to find any similar errors on google. Does anyone know what I'm doing wrong? The full stack trace is included below: java.lang.NoClassDefFoundError: Lorg/apache/hadoop/hive/ql/plan/TableDesc; at java.lang.Class.getDeclaredFields0(Native Method) at java.lang.Class.privateGetDeclaredFields(Class.java:2436) at java.lang.Class.getDeclaredField(Class.java:1946) at java.io.ObjectStreamClass.getDeclaredSUID(ObjectStreamClass.java:1659) at java.io.ObjectStreamClass.access$700(ObjectStreamClass.java:72) at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:480) at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468) at java.security.AccessController.doPrivileged(Native Method) at java.io.ObjectStreamClass.(ObjectStreamClass.java:468) at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365) at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:602) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990