We have our Analytics App built on Spark 1.1 Core, Parquet, Avro and Spray.
We are using Kryo serializer for the Avro objects read from Parquet and we
are using our custom Kryo registrator (along the lines of  ADAM
<https://github.com/bigdatagenomics/adam/blob/master/adam-core/src/main/scala/org/bdgenomics/adam/serialization/ADAMKryoRegistrator.scala#L51>
 
, we just added batched writes and flushes to Kryo's Output for each 512 MB
in the stream, as below 
outstream.array.sliding(512MB).foreach(buf => {
  kryoOut.write(buf)
  kryoOut.flush()
})
)

Our queries are done to a cached RDD(MEMORY_ONLY), that is obtained after 
1. loading bulk data from Parquet
2. union-ing it with incremental data in Avro
3. doing timestamp based duplicate removal (including partitioning in
reduceByKey) and 
4. joining a couple of MySQL tables using JdbcRdd

Of late, we are seeing major instabilities where the app crashes on a lost
executor which itself failed due to a OutOfMemory error as below. This looks
almost identical to https://issues.apache.org/jira/browse/SPARK-4885 even
though we are seeing this error in Spark 1.1

2015-01-15 20:12:51,653 [handle-message-executor-13] ERROR
org.apache.spark.executor.ExecutorUncaughtExceptionHandler - Uncaught
exception in thread Thread[handle-message-executor-13,5,main]
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
        at java.util.Arrays.copyOf(Arrays.java:2271)
        at
java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
        at
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
        at
java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
        at
java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
        at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
        at com.esotericsoftware.kryo.io.Output.flush(Output.java:155)
        at com.esotericsoftware.kryo.io.Output.require(Output.java:135)
        at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
        at com.esotericsoftware.kryo.io.Output.write(Output.java:183)
        at
com.philips.hc.eici.analytics.streamingservice.AvroSerializer$$anonfun$write$1.apply(AnalyticsKryoRegistrator.scala:31)
        at
com.philips.hc.eici.analytics.streamingservice.AvroSerializer$$anonfun$write$1.apply(AnalyticsKryoRegistrator.scala:30)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at
com.philips.hc.eici.analytics.streamingservice.AvroSerializer.write(AnalyticsKryoRegistrator.scala:30)
        at
com.philips.hc.eici.analytics.streamingservice.AvroSerializer.write(AnalyticsKryoRegistrator.scala:18)
        at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
        at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
        at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
        at
org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:119)
        at
org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110)
        at
org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1047)
        at
org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1056)
        at
org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:154)
        at
org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:421)
        at
org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:387)
        at
org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:100)
        at
org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:79)
        at
org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48)
        at
org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48)
        at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)


The driver log is as below

15/01/15 12:12:53 ERROR scheduler.DAGSchedulerActorSupervisor:
eventProcesserActor failed; shutting down SparkContext
java.util.NoSuchElementException: key not found: 2539
        at scala.collection.MapLike$class.default(MapLike.scala:228)
        at scala.collection.AbstractMap.default(Map.scala:58)
        at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
        at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:799)
        at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:769)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:772)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:771)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:771)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:772)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:771)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:771)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:772)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:771)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:771)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$resubmitFailedStages$3.apply(DAGScheduler.scala:586)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$resubmitFailedStages$3.apply(DAGScheduler.scala:585)
        at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
        at
org.apache.spark.scheduler.DAGScheduler.resubmitFailedStages(DAGScheduler.scala:585)
        at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1394)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
        at akka.actor.ActorCell.invoke(ActorCell.scala:456)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
        at akka.dispatch.Mailbox.run(Mailbox.scala:219)
        at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:385)
        at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


Any help would be greatly appreciated.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/OutOfMemory-error-in-Spark-Core-tp21179.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to