Hi All, I'm using PySpark to create a corpus of labeled data points. I create an RDD called corpus, and then join to this RDD each newly-created feature RDD as I go. My code repeats something like this for each feature:
feature = raw_data_rdd.map(...).reduceByKey(...).map(...) # create feature RDD corpus = corpus.join(feature).map(lambda x: (x[0], x[1][0] + (x[1][1],)) # "append" new feature to existing corpus The corpus RDD is a key-value tuple, where the key is the label and the value is a tuple of the features. I repeat the above for the 6 features i'm working with. It looks like I'm running into a memory error when performing the join on the last feature. Here's some relevant information: - raw_data_rdd has ~ 50 million entries, while feature and corpus have ~ 450k after the map-reduce operations - The driver and each of the 6 executor nodes have 6GB memory available - I'm kicking off the script using the following: pyspark --driver-memory 2G --executor-memory 2G --conf spark.akka.frameSize=64 create_corpus.py My question is: why would I be running out of memory when joining the relatively small feature and corpus RRDs? Also, what happens to the "old" corpus RDD when I join it and point corpus to the new, larger RDD? Does this stay in memory, and could this be the reason why i'm running into the issue? If so, is there a better way of "appending" to my corpus RDD? Should I be persisting raw_data_rdd? The full error is shown below. Please let me know if I'm missing something obvious. Thank you! Kevin Mandich Exception in thread "refresh progress" Exception in thread "SparkListenerBus" [2015-09-04 20:43:14,385] {bash_operator.py:58} INFO - Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "SparkListenerBus" [2015-09-04 20:43:30,999] {bash_operator.py:58} INFO - Exception in thread "qtp268929808-35" java.lang.OutOfMemoryError: Java heap space [2015-09-04 20:43:30,999] {bash_operator.py:58} INFO - at java.util.concurrent.locks.AbstractQueuedSynchronizer.addWaiter(AbstractQueuedSynchronizer.java:606) [2015-09-04 20:43:30,999] {bash_operator.py:58} INFO - at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:883) [2015-09-04 20:43:31,000] {bash_operator.py:58} INFO - at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1221) [2015-09-04 20:43:32,562] {bash_operator.py:58} INFO - at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:340) [2015-09-04 20:43:32,562] {bash_operator.py:58} INFO - at org.spark-project.jetty.util.BlockingArrayQueue.poll(BlockingArrayQueue.java:333) [2015-09-04 20:43:32,563] {bash_operator.py:58} INFO - at org.spark-project.jetty.util.thread.QueuedThreadPool.idleJobPoll(QueuedThreadPool.java:526) [2015-09-04 20:43:32,563] {bash_operator.py:58} INFO - at org.spark-project.jetty.util.thread.QueuedThreadPool.access$600(QueuedThreadPool.java:44) [2015-09-04 20:43:32,563] {bash_operator.py:58} INFO - at org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:572) [2015-09-04 20:43:32,563] {bash_operator.py:58} INFO - at java.lang.Thread.run(Thread.java:745) [2015-09-04 20:43:32,563] {bash_operator.py:58} INFO - java.lang.OutOfMemoryError: Java heap space [2015-09-04 20:43:32,563] {bash_operator.py:58} INFO - Exception in thread "qtp1514449570-77" java.lang.OutOfMemoryError: Java heap space [2015-09-04 20:43:37,366] {bash_operator.py:58} INFO - at java.util.concurrent.ConcurrentHashMap$KeySet.iterator(ConcurrentHashMap.java:1428) [2015-09-04 20:43:37,366] {bash_operator.py:58} INFO - at org.spark-project.jetty.io.nio.SelectorManager$SelectSet$1.run(SelectorManager.java:712) [2015-09-04 20:43:37,366] {bash_operator.py:58} INFO - at org.spark-project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608) [2015-09-04 20:43:41,458] {bash_operator.py:58} INFO - at org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543) [2015-09-04 20:43:41,459] {bash_operator.py:58} INFO - at java.lang.Thread.run(Thread.java:745) [2015-09-04 20:55:04,411] {bash_operator.py:58} INFO - Exception in thread "qtp1514449570-72" [2015-09-04 20:55:04,412] {bash_operator.py:58} INFO - Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "qtp1514449570-72" [2015-09-04 20:58:25,671] {bash_operator.py:58} INFO - Exception in thread "dag-scheduler-event-loop" java.lang.OutOfMemoryError: Java heap space