Hi,

I'm running a job on Spark 1.5.2 and I get OutOfMemoryError on broadcast
variables access. The thing is I am not sure to understand why the
broadcast keeps growing and why it does at this place of code.

Basically, I have a large input file, each line having a key. I group by
key my lines to have one object with all data related to a given key. Then
I am doing a map to iterate over my objects, and for each object, I iterate
over a collection which is a broadcast variable. The exception is thrown
when iterating on the broadcast variable.

Here is a quick example:

Input file :

key1,row1
key2,row1
key1,row2
key2,row2

Broadcast variable is a list: List(1,2,3)

I group by key my input :

key1, (row1,row2)
key2, (row1,row2)

Then I do a map

myRdd.map( object -> for(item <- myBroadcast.value) executeFunction(object,
item) )

I know the groupByKey is a very costly operation but I am not sure I can
avoid it since the 'executeFunction' needs to have all the lines for a
given key to be executed. Besides the stage where the groupByKey is
performed is successfully completed when the exception is thrown.

Here is an extract from the logs:

16/08/04 03:17:50 WARN storage.MemoryStore: Not enough space to cache
broadcast_90 in memory! (computed 2.3 GB so far) 16/08/04 03:18:37 WARN
storage.MemoryStore: Not enough space to cache broadcast_90 in memory!
(computed 2.3 GB so far) 16/08/04 03:19:22 WARN storage.MemoryStore: Not
enough space to cache broadcast_90 in memory! (computed 2.4 GB so far)
16/08/04 03:20:07 WARN storage.MemoryStore: Not enough space to cache
broadcast_90 in memory! (computed 2.5 GB so far) 16/08/04 03:20:53 WARN
storage.MemoryStore: Not enough space to cache broadcast_90 in memory!
(computed 2.6 GB so far) 16/08/04 03:21:11 WARN storage.MemoryStore: Not
enough space to cache broadcast_90 in memory! (computed 2.7 GB so far)
16/08/04 03:21:15 WARN storage.MemoryStore: Not enough space to cache
broadcast_90 in memory! (computed 2.5 GB so far) 16/08/04 03:44:22 WARN
storage.MemoryStore: Not enough space to cache broadcast_90 in memory!
(computed 3.4 GB so far) 16/08/04 03:53:03 WARN storage.MemoryStore: Not
enough space to cache broadcast_90 in memory! (computed 3.8 GB so far)
16/08/04 04:02:00 WARN storage.MemoryStore: Not enough space to cache
broadcast_90 in memory! (computed 4.1 GB so far) 16/08/04 04:20:52 ERROR
executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15: SIGTERM 16/08/04
04:20:52 ERROR executor.Executor: Exception in task 1.0 in stage 62.1 (TID
1109) java.lang.OutOfMemoryError: Java heap space at
java.util.IdentityHashMap.resize(IdentityHashMap.java:469) at
java.util.IdentityHashMap.put(IdentityHashMap.java:445) at
org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:159)
at org.apache.spark.util.SizeEstimator$.visitArray(SizeEstimator.scala:229)
at
org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:194)
at
org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:186)
at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:54) at
org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
at
org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
at
org.apache.spark.util.collection.SizeTrackingVector.$plus$eq(SizeTrackingVector.scala:31)
at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:278)
at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:165)
at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:550)
at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:429)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:168)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1175) at
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
at
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
at
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
at
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)

...

at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.Iterator$class.foreach(Iterator.scala:727) at
org.apache.spark.util.collection.CompactBuffer$$anon$1.foreach(CompactBuffer.scala:115)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
org.apache.spark.util.collection.CompactBuffer.foreach(CompactBuffer.scala:30)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at
org.apache.spark.util.collection.CompactBuffer.map(CompactBuffer.scala:30)

Any suggestion regarding what could explain this behavior?

Thanks!

Reply via email to