While working with larger datasets I run into out of memory issues.
Basically a hadoop sequence file is read, its contents are sorted and a
hadoop map file is written back. Code works fine for workloads greater
than 20gb. Than I changed one column in my dataset to store a large
object and size of row object increased from 20kb to about 4mb. Now the
same code runs into java heap space issues and application is shut down
with an out of memory exception.

Seems dataframe sort operations cannot handle large objects. I took an
heap dump and saw an large array-of-array. I would expect such object
when using collect() operation when single task results are collected
into large array. I know, groupBy and collect() operation will cause
such problems on large datasets, but I expected a single sort should not
run into such issues. I switched from sort() to sortWithinPartitions()
and the applicationdid not crash. Of course, the result is not the same.
But shouldn't a simple sort() not work at all?

I created a simple test programm, which blows up a tiny Int-RDD to
Row-RDD with such large objects and found out, that spilling to disk
seems not to work out of the box. Defaults and any StorageLevel of
MEMORY* runs in this issues, only DISK_ONLY() works but is very slow.

I posted an question with example code to stackoverflow:
https://stackoverflow.com/questions/51546921/apache-spark-dataframe-causes-out-of-memory

My question to the community is, how to sort growing number of data
without increasing heap-size?

I found out following facts:

  * larger datasets require to set maxResult size to greater values or 0
    for no limit
  * row object size seems to impact memory usage
  * GC1 garbage collector may run in fragmentation issues for large
    objects, so I used parallelGc instead. In my case this has no
    impact, after processing n tasks heap runs full
  * reducing driver- and executor memory takes no effect, heap always
    fills in same way
  * persist with DISK* Storage level is no warranty that spark spills
    data to disk
  * using kryo serializer has in my case less effect, some more tasks
    are finsihed before oom occues
  * sortWithinPartitions works but after that only contents of
    partitions are sorted

I assume sortWithinPartitions with a merge-shuffle-join should be okay
to sort the final result. But why does spark combine whole resultset on
driver? That is not very scalable?!

So I dropped large column and let spark sort the other columns and
finally I do left-join to combine large data back. Code runs without oom
but left-join looses sort-order. Any ideas?

May latest tests are not finished yet, but sorting on RDD instead of
dataframe seems to work better. Coding is more complex and I expected
catalyst optimizer in dataframes does not choose optimal settings.

How can growing datasets be sorted without increasing memory? Is my code
worse or is it just a spark bug?

My setup is:

- Windows 10, Java 1.8u144 (u171) with -Xms5g -Xmx5g and optional
-XX:+UseParallelOldGC

-Spark 2.3.1 in local mode (running as single node cluster on my
workstation)





---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to