Thanks for all the answers!
It looks like increasing the heap a little, and setting spark.sql.
shuffle.partitions to a much lower number (I used the recommended
input_size_mb/128 formula) did the trick.
As for partitionBy, unless I use repartition("dt") before the writer, it
actually writes more than one output file per "dt" partition so I guess the
same "dt" value is spread across multiple partitions, right?

On Mon, Sep 25, 2017 at 11:07 PM ayan guha <guha.a...@gmail.com> wrote:

> Another possible option would be creating partitioned table in hive and
> use dynamic partitioning while inserting. This will not require spark to do
> explocit partition by
>
> On Tue, 26 Sep 2017 at 12:39 pm, Ankur Srivastava <
> ankur.srivast...@gmail.com> wrote:
>
>> Hi Amit,
>>
>> Spark keeps the partition that it is working on in memory (and does not
>> spill to disk even if it is running OOM). Also since you are getting OOM
>> when using partitionBy (and not when you just use flatMap), there should be
>> one (or few) dates on which your partition size is bigger than the heap.
>> You can do a count on dates to check if there is skewness in your data.
>>
>> The way out would be increase the heap size or use columns in partitionBy
>> (like date + hour) to distribute the data better.
>>
>> Hope this helps!
>>
>> Thanks
>> Ankur
>>
>> On Mon, Sep 25, 2017 at 7:30 PM, 孫澤恩 <gn00710...@gmail.com> wrote:
>>
>>> Hi, Amit,
>>>
>>> Maybe you can change this configuration spark.sql.shuffle.partitions.
>>> The default is 200 change this property could change the task number
>>> when you are using DataFrame API.
>>>
>>> On 26 Sep 2017, at 1:25 AM, Amit Sela <amit.s...@venmo.com> wrote:
>>>
>>> I'm trying to run a simple pyspark application that reads from file
>>> (json), flattens it (explode) and writes back to file (json) partitioned by
>>> date using DataFrameWriter.partitionBy(*cols).
>>>
>>> I keep getting OOMEs like:
>>> java.lang.OutOfMemoryError: Java heap space
>>> at
>>> org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.<init>(UnsafeSorterSpillWriter.java:46)
>>> at
>>> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:206)
>>> at
>>> org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:203)
>>> .......
>>>
>>> Explode could make the underlying RDD grow a lot, and maybe in an
>>> unbalanced way sometimes,
>>> adding to that partitioning by date (in daily ETLs for instance) would
>>> probably cause a data skew (right?), but why am I getting OOMs? Isn't Spark
>>> supposed to spill to disk if the underlying RDD is too big to fit in memory?
>>>
>>> If I'm not using "partitionBy" with the writer (still exploding)
>>> everything works fine.
>>>
>>> This happens both in EMR and in local (mac) pyspark/spark shell (tried
>>> both in python and scala).
>>>
>>> Thanks!
>>>
>>>
>>>
>> --
> Best Regards,
> Ayan Guha
>

Reply via email to