Hi All,

I'm using PySpark to create a corpus of labeled data points. I create an
RDD called corpus, and then join to this RDD each newly-created feature RDD
as I go. My code repeats something like this for each feature:

feature = raw_data_rdd.map(...).reduceByKey(...).map(...) # create feature
RDD
corpus = corpus.join(feature).map(lambda x: (x[0], x[1][0] + (x[1][1],)) #
"append" new feature to existing corpus

The corpus RDD is a key-value tuple, where the key is the label and the
value is a tuple of the features. I repeat the above for the 6 features i'm
working with. It looks like I'm running into a memory error when performing
the join on the last feature. Here's some relevant information:

- raw_data_rdd has ~ 50 million entries, while feature and corpus have ~
450k after the map-reduce operations
- The driver and each of the 6 executor nodes have 6GB memory available
- I'm kicking off the script using the following:
pyspark --driver-memory 2G --executor-memory 2G --conf
spark.akka.frameSize=64 create_corpus.py

My question is: why would I be running out of memory when joining the
relatively small feature and corpus RRDs? Also, what happens to the "old"
corpus RDD when I join it and point corpus to the new, larger RDD? Does
this stay in memory, and could this be the reason why i'm running into the
issue? If so, is there a better way of "appending" to my corpus RDD? Should
I be persisting raw_data_rdd? The full error is shown below.

Please let me know if I'm missing something obvious. Thank you!

Kevin Mandich


Exception in thread "refresh progress" Exception in thread "SparkListenerBus"
[2015-09-04 20:43:14,385] {bash_operator.py:58} INFO - Exception:
java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in
thread "SparkListenerBus"
[2015-09-04 20:43:30,999] {bash_operator.py:58} INFO - Exception in
thread "qtp268929808-35" java.lang.OutOfMemoryError: Java heap space
[2015-09-04 20:43:30,999] {bash_operator.py:58} INFO - at
java.util.concurrent.locks.AbstractQueuedSynchronizer.addWaiter(AbstractQueuedSynchronizer.java:606)
[2015-09-04 20:43:30,999] {bash_operator.py:58} INFO - at
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:883)
[2015-09-04 20:43:31,000] {bash_operator.py:58} INFO - at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1221)
[2015-09-04 20:43:32,562] {bash_operator.py:58} INFO - at
java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:340)
[2015-09-04 20:43:32,562] {bash_operator.py:58} INFO - at
org.spark-project.jetty.util.BlockingArrayQueue.poll(BlockingArrayQueue.java:333)
[2015-09-04 20:43:32,563] {bash_operator.py:58} INFO - at
org.spark-project.jetty.util.thread.QueuedThreadPool.idleJobPoll(QueuedThreadPool.java:526)
[2015-09-04 20:43:32,563] {bash_operator.py:58} INFO - at
org.spark-project.jetty.util.thread.QueuedThreadPool.access$600(QueuedThreadPool.java:44)
[2015-09-04 20:43:32,563] {bash_operator.py:58} INFO - at
org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:572)
[2015-09-04 20:43:32,563] {bash_operator.py:58} INFO - at
java.lang.Thread.run(Thread.java:745)
[2015-09-04 20:43:32,563] {bash_operator.py:58} INFO -
java.lang.OutOfMemoryError: Java heap space
[2015-09-04 20:43:32,563] {bash_operator.py:58} INFO - Exception in
thread "qtp1514449570-77" java.lang.OutOfMemoryError: Java heap space
[2015-09-04 20:43:37,366] {bash_operator.py:58} INFO - at
java.util.concurrent.ConcurrentHashMap$KeySet.iterator(ConcurrentHashMap.java:1428)
[2015-09-04 20:43:37,366] {bash_operator.py:58} INFO - at
org.spark-project.jetty.io.nio.SelectorManager$SelectSet$1.run(SelectorManager.java:712)
[2015-09-04 20:43:37,366] {bash_operator.py:58} INFO - at
org.spark-project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
[2015-09-04 20:43:41,458] {bash_operator.py:58} INFO - at
org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
[2015-09-04 20:43:41,459] {bash_operator.py:58} INFO - at
java.lang.Thread.run(Thread.java:745)
[2015-09-04 20:55:04,411] {bash_operator.py:58} INFO - Exception in
thread "qtp1514449570-72"
[2015-09-04 20:55:04,412] {bash_operator.py:58} INFO - Exception:
java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in
thread "qtp1514449570-72"
[2015-09-04 20:58:25,671] {bash_operator.py:58} INFO - Exception in
thread "dag-scheduler-event-loop" java.lang.OutOfMemoryError: Java
heap space

Reply via email to