Did you try increasing the parallelism?

Thanks
Best Regards

On Fri, Jan 16, 2015 at 10:41 AM, Anand Mohan <chinn...@gmail.com> wrote:

> We have our Analytics App built on Spark 1.1 Core, Parquet, Avro and Spray.
> We are using Kryo serializer for the Avro objects read from Parquet and we
> are using our custom Kryo registrator (along the lines of  ADAM
> <
> https://github.com/bigdatagenomics/adam/blob/master/adam-core/src/main/scala/org/bdgenomics/adam/serialization/ADAMKryoRegistrator.scala#L51
> >
> , we just added batched writes and flushes to Kryo's Output for each 512 MB
> in the stream, as below
> outstream.array.sliding(512MB).foreach(buf => {
>   kryoOut.write(buf)
>   kryoOut.flush()
> })
> )
>
> Our queries are done to a cached RDD(MEMORY_ONLY), that is obtained after
> 1. loading bulk data from Parquet
> 2. union-ing it with incremental data in Avro
> 3. doing timestamp based duplicate removal (including partitioning in
> reduceByKey) and
> 4. joining a couple of MySQL tables using JdbcRdd
>
> Of late, we are seeing major instabilities where the app crashes on a lost
> executor which itself failed due to a OutOfMemory error as below. This
> looks
> almost identical to https://issues.apache.org/jira/browse/SPARK-4885 even
> though we are seeing this error in Spark 1.1
>
> 2015-01-15 20:12:51,653 [handle-message-executor-13] ERROR
> org.apache.spark.executor.ExecutorUncaughtExceptionHandler - Uncaught
> exception in thread Thread[handle-message-executor-13,5,main]
> java.lang.OutOfMemoryError: Requested array size exceeds VM limit
>         at java.util.Arrays.copyOf(Arrays.java:2271)
>         at
> java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
>         at
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>         at
> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
>         at
> java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
>         at
> java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
>         at com.esotericsoftware.kryo.io.Output.flush(Output.java:155)
>         at com.esotericsoftware.kryo.io.Output.require(Output.java:135)
>         at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
>         at com.esotericsoftware.kryo.io.Output.write(Output.java:183)
>         at
>
> com.philips.hc.eici.analytics.streamingservice.AvroSerializer$$anonfun$write$1.apply(AnalyticsKryoRegistrator.scala:31)
>         at
>
> com.philips.hc.eici.analytics.streamingservice.AvroSerializer$$anonfun$write$1.apply(AnalyticsKryoRegistrator.scala:30)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>         at
>
> com.philips.hc.eici.analytics.streamingservice.AvroSerializer.write(AnalyticsKryoRegistrator.scala:30)
>         at
>
> com.philips.hc.eici.analytics.streamingservice.AvroSerializer.write(AnalyticsKryoRegistrator.scala:18)
>         at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>         at
>
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>         at
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>         at
> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>         at
>
> org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:119)
>         at
>
> org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110)
>         at
>
> org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1047)
>         at
>
> org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1056)
>         at
> org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:154)
>         at
> org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:421)
>         at
> org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:387)
>         at
>
> org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:100)
>         at
>
> org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:79)
>         at
>
> org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48)
>         at
>
> org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48)
>         at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>
>
> The driver log is as below
>
> 15/01/15 12:12:53 ERROR scheduler.DAGSchedulerActorSupervisor:
> eventProcesserActor failed; shutting down SparkContext
> java.util.NoSuchElementException: key not found: 2539
>         at scala.collection.MapLike$class.default(MapLike.scala:228)
>         at scala.collection.AbstractMap.default(Map.scala:58)
>         at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
>         at
> org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:799)
>         at
> org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:769)
>         at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:772)
>         at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:771)
>         at scala.collection.immutable.List.foreach(List.scala:318)
>         at
> org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:771)
>         at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:772)
>         at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:771)
>         at scala.collection.immutable.List.foreach(List.scala:318)
>         at
> org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:771)
>         at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:772)
>         at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:771)
>         at scala.collection.immutable.List.foreach(List.scala:318)
>         at
> org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:771)
>         at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$resubmitFailedStages$3.apply(DAGScheduler.scala:586)
>         at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$resubmitFailedStages$3.apply(DAGScheduler.scala:585)
>         at
>
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>         at
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>         at
>
> org.apache.spark.scheduler.DAGScheduler.resubmitFailedStages(DAGScheduler.scala:585)
>         at
>
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1394)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>         at
>
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:385)
>         at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
>
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
> Any help would be greatly appreciated.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/OutOfMemory-error-in-Spark-Core-tp21179.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to