Did you try increasing the parallelism? Thanks Best Regards
On Fri, Jan 16, 2015 at 10:41 AM, Anand Mohan <chinn...@gmail.com> wrote: > We have our Analytics App built on Spark 1.1 Core, Parquet, Avro and Spray. > We are using Kryo serializer for the Avro objects read from Parquet and we > are using our custom Kryo registrator (along the lines of ADAM > < > https://github.com/bigdatagenomics/adam/blob/master/adam-core/src/main/scala/org/bdgenomics/adam/serialization/ADAMKryoRegistrator.scala#L51 > > > , we just added batched writes and flushes to Kryo's Output for each 512 MB > in the stream, as below > outstream.array.sliding(512MB).foreach(buf => { > kryoOut.write(buf) > kryoOut.flush() > }) > ) > > Our queries are done to a cached RDD(MEMORY_ONLY), that is obtained after > 1. loading bulk data from Parquet > 2. union-ing it with incremental data in Avro > 3. doing timestamp based duplicate removal (including partitioning in > reduceByKey) and > 4. joining a couple of MySQL tables using JdbcRdd > > Of late, we are seeing major instabilities where the app crashes on a lost > executor which itself failed due to a OutOfMemory error as below. This > looks > almost identical to https://issues.apache.org/jira/browse/SPARK-4885 even > though we are seeing this error in Spark 1.1 > > 2015-01-15 20:12:51,653 [handle-message-executor-13] ERROR > org.apache.spark.executor.ExecutorUncaughtExceptionHandler - Uncaught > exception in thread Thread[handle-message-executor-13,5,main] > java.lang.OutOfMemoryError: Requested array size exceeds VM limit > at java.util.Arrays.copyOf(Arrays.java:2271) > at > java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) > at > java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) > at > java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) > at > java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) > at > java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) > at com.esotericsoftware.kryo.io.Output.flush(Output.java:155) > at com.esotericsoftware.kryo.io.Output.require(Output.java:135) > at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220) > at com.esotericsoftware.kryo.io.Output.write(Output.java:183) > at > > com.philips.hc.eici.analytics.streamingservice.AvroSerializer$$anonfun$write$1.apply(AnalyticsKryoRegistrator.scala:31) > at > > com.philips.hc.eici.analytics.streamingservice.AvroSerializer$$anonfun$write$1.apply(AnalyticsKryoRegistrator.scala:30) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > > com.philips.hc.eici.analytics.streamingservice.AvroSerializer.write(AnalyticsKryoRegistrator.scala:30) > at > > com.philips.hc.eici.analytics.streamingservice.AvroSerializer.write(AnalyticsKryoRegistrator.scala:18) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > at > > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) > at > > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at > com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) > at > > org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:119) > at > > org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110) > at > > org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1047) > at > > org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1056) > at > org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:154) > at > org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:421) > at > org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:387) > at > > org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:100) > at > > org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:79) > at > > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48) > at > > org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48) > at > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > > > The driver log is as below > > 15/01/15 12:12:53 ERROR scheduler.DAGSchedulerActorSupervisor: > eventProcesserActor failed; shutting down SparkContext > java.util.NoSuchElementException: key not found: 2539 > at scala.collection.MapLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:58) > at scala.collection.mutable.HashMap.apply(HashMap.scala:64) > at > org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:799) > at > org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:769) > at > > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:772) > at > > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:771) > at scala.collection.immutable.List.foreach(List.scala:318) > at > org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:771) > at > > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:772) > at > > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:771) > at scala.collection.immutable.List.foreach(List.scala:318) > at > org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:771) > at > > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:772) > at > > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:771) > at scala.collection.immutable.List.foreach(List.scala:318) > at > org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:771) > at > > org.apache.spark.scheduler.DAGScheduler$$anonfun$resubmitFailedStages$3.apply(DAGScheduler.scala:586) > at > > org.apache.spark.scheduler.DAGScheduler$$anonfun$resubmitFailedStages$3.apply(DAGScheduler.scala:585) > at > > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at > scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) > at > > org.apache.spark.scheduler.DAGScheduler.resubmitFailedStages(DAGScheduler.scala:585) > at > > org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1394) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:385) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > > Any help would be greatly appreciated. > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/OutOfMemory-error-in-Spark-Core-tp21179.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >