Thanks for all the answers! It looks like increasing the heap a little, and setting spark.sql. shuffle.partitions to a much lower number (I used the recommended input_size_mb/128 formula) did the trick. As for partitionBy, unless I use repartition("dt") before the writer, it actually writes more than one output file per "dt" partition so I guess the same "dt" value is spread across multiple partitions, right?
On Mon, Sep 25, 2017 at 11:07 PM ayan guha <guha.a...@gmail.com> wrote: > Another possible option would be creating partitioned table in hive and > use dynamic partitioning while inserting. This will not require spark to do > explocit partition by > > On Tue, 26 Sep 2017 at 12:39 pm, Ankur Srivastava < > ankur.srivast...@gmail.com> wrote: > >> Hi Amit, >> >> Spark keeps the partition that it is working on in memory (and does not >> spill to disk even if it is running OOM). Also since you are getting OOM >> when using partitionBy (and not when you just use flatMap), there should be >> one (or few) dates on which your partition size is bigger than the heap. >> You can do a count on dates to check if there is skewness in your data. >> >> The way out would be increase the heap size or use columns in partitionBy >> (like date + hour) to distribute the data better. >> >> Hope this helps! >> >> Thanks >> Ankur >> >> On Mon, Sep 25, 2017 at 7:30 PM, 孫澤恩 <gn00710...@gmail.com> wrote: >> >>> Hi, Amit, >>> >>> Maybe you can change this configuration spark.sql.shuffle.partitions. >>> The default is 200 change this property could change the task number >>> when you are using DataFrame API. >>> >>> On 26 Sep 2017, at 1:25 AM, Amit Sela <amit.s...@venmo.com> wrote: >>> >>> I'm trying to run a simple pyspark application that reads from file >>> (json), flattens it (explode) and writes back to file (json) partitioned by >>> date using DataFrameWriter.partitionBy(*cols). >>> >>> I keep getting OOMEs like: >>> java.lang.OutOfMemoryError: Java heap space >>> at >>> org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.<init>(UnsafeSorterSpillWriter.java:46) >>> at >>> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:206) >>> at >>> org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:203) >>> ....... >>> >>> Explode could make the underlying RDD grow a lot, and maybe in an >>> unbalanced way sometimes, >>> adding to that partitioning by date (in daily ETLs for instance) would >>> probably cause a data skew (right?), but why am I getting OOMs? Isn't Spark >>> supposed to spill to disk if the underlying RDD is too big to fit in memory? >>> >>> If I'm not using "partitionBy" with the writer (still exploding) >>> everything works fine. >>> >>> This happens both in EMR and in local (mac) pyspark/spark shell (tried >>> both in python and scala). >>> >>> Thanks! >>> >>> >>> >> -- > Best Regards, > Ayan Guha >