Hi,

We are using spark 1.5.2 and recently hitting this issue after our dataset
grew from 140GB to 160GB. Error is thrown during shuffle fetch on reduce
side which all should happen on executors and executor should report them!
However its gets reported only on driver. SparkContext gets shutdown from
driver side after this error occur.

Here's what I see in driver logs.



2016-04-04 03:51:32,889 INFO [sparkDriver-akka.actor.default-dispatcher-17]
org.apache.spark.MapOutputTrackerMasterEndpoint: Asked to send map output
locations for shuffle 3 to hdn3.mycomp:37339
2016-04-04 03:51:32,890 INFO [sparkDriver-akka.actor.default-dispatcher-17]
org.apache.spark.MapOutputTrackerMasterEndpoint: Asked to send map output
locations for shuffle 3 to hdn3.mycomp:57666
2016-04-04 03:51:33,133 INFO [sparkDriver-akka.actor.default-dispatcher-21]
org.apache.spark.storage.BlockManagerInfo: Removed broadcast_12_piece0 on
10.250.70.117:42566 in memory (size: 1939.0 B, free: 232.5 MB)
2016-04-04 03:51:38,432 ERROR
[sparkDriver-akka.actor.default-dispatcher-14]
org.apache.spark.rpc.akka.ErrorMonitor: Uncaught fatal error from thread
[sparkDriver-akka.remote.default-remote-dispatcher-6] shutting down
ActorSystem [sparkDriver]
java.lang.OutOfMemoryError: Java heap space
at
com.google.protobuf.AbstractMessageLite.toByteArray(AbstractMessageLite.java:62)
at
akka.remote.transport.AkkaPduProtobufCodec$.constructMessage(AkkaPduCodec.scala:138)
at akka.remote.EndpointWriter.writeSend(Endpoint.scala:740)
at akka.remote.EndpointWriter$$anonfun$2.applyOrElse(Endpoint.scala:718)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:411)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2016-04-04 03:51:38,432 ERROR
[sparkDriver-akka.actor.default-dispatcher-21] akka.actor.ActorSystemImpl:
Uncaught fatal error from thread
[sparkDriver-akka.remote.default-remote-dispatcher-6] shutting down
ActorSystem [sparkDriver]
java.lang.OutOfMemoryError: Java heap space
at
com.google.protobuf.AbstractMessageLite.toByteArray(AbstractMessageLite.java:62)
at
akka.remote.transport.AkkaPduProtobufCodec$.constructMessage(AkkaPduCodec.scala:138)
at akka.remote.EndpointWriter.writeSend(Endpoint.scala:740)
at akka.remote.EndpointWriter$$anonfun$2.applyOrElse(Endpoint.scala:718)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:411)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2016-04-04 03:51:40,246 ERROR [sparkDriver-akka.actor.default-dispatcher-4]
akka.actor.ActorSystemImpl: Uncaught fatal error from thread
[sparkDriver-akka.remote.default-remote-dispatcher-6] shutting down
ActorSystem [sparkDriver]
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:2271)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
at
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
at
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
at
java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129)
at
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
at
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
at
akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:843)
at
akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:843)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:842)
at akka.remote.EndpointWriter.writeSend(Endpoint.scala:743)
at akka.remote.EndpointWriter$$anonfun$2.applyOrElse(Endpoint.scala:718)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:411)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2016-04-04 03:51:41,784 ERROR [sparkDriver-akka.actor.default-dispatcher-4]
akka.actor.ActorSystemImpl: Uncaught fatal error from thread
[sparkDriver-akka.remote.default-remote-dispatcher-6] shutting down
ActorSystem [sparkDriver]
java.lang.OutOfMemoryError: Java heap space
2016-04-04 03:51:43,354 ERROR [sparkDriver-akka.actor.default-dispatcher-4]
akka.actor.ActorSystemImpl: Uncaught fatal error from thread
[sparkDriver-akka.remote.default-remote-dispatcher-5] shutting down
ActorSystem [sparkDriver]
java.lang.OutOfMemoryError: Java heap space
2016-04-04 03:51:43,355 INFO [sparkDriver-akka.actor.default-dispatcher-4]
akka.remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote
daemon.
2016-04-04 03:51:43,358 INFO [sparkDriver-akka.actor.default-dispatcher-4]
akka.remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut
down; proceeding with flushing remote transports.
2016-04-04 03:51:43,434 INFO [sparkDriver-akka.actor.default-dispatcher-4]
akka.remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
2016-04-04 03:51:44,361 ERROR [Yarn application state monitor]
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend: Yarn
application has already exited with state FINISHED!



Is it really happening on shuffle fetch on reducer side?

Is spark hitting byte array limits of 2GB while serializing before shuffle?

What is a good solution here?


Thanks
Nirav

-- 


[image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>

<https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn] 
<https://www.linkedin.com/company/xactly-corporation>  [image: Twitter] 
<https://twitter.com/Xactly>  [image: Facebook] 
<https://www.facebook.com/XactlyCorp>  [image: YouTube] 
<http://www.youtube.com/xactlycorporation>

Reply via email to