The error you're receiving is because the Akka frame size must be
a positive Java Integer, i.e., less than 2^31. However, the frame size is
not intended to be nearly the size of the job memory -- it is the smallest
unit of data transfer that Spark does. In this case, your "task result"
size is exceeding 10MB, which means that returning the results for a single
partition of your data is >10MB.

It appears that the default JavaWordCount example has a minSplits value of
1 (ctx.textFile(args[1], 1)). This really means that the number of
partitions will be max(1, # hdfs blocks in file). If you have an HDFS block
of size ~64MB and all distinct words, the resulting task set may be around
the same size, which is >10MB.

You have two collaborating solutions:

   1. Increase the value of minSplits to reduce the size of any single
   TaskResult set, like: ctx.textFile(args[1], 256)
   2. Increase the Akka frame size by a small amount (e.g., to 20-70MB).

Please note that this issue, while annoying, is in good part due to the
lack of realism of this example. You very rarely call collect() in Spark in
actual usage, as that will put *all *your output data on the driver
machine. Much more likely you'd save to an HDFS file or compute the top 100
words or something like that, which would not have this problem.

(One final note about your configuration, the Spark Worker is simply
responsible for spawning Executors, which do the actual computation. As
such, it is typical not to change the Worker memory at all [as it needs
very little] but rather to give the majority of a machine's memory
distributed amongst the Executors. If each machine has 16 GB of RAM and 4
cores, for example, you might set spark.executor.memory between 2 and 3 GB,
totaling 8-12 GB used by Spark.)


On Tue, Dec 24, 2013 at 3:58 AM, leosand...@gmail.com
<leosand...@gmail.com>wrote:

>  Hi, everyone
>
> I have a question about the arg spark.akka.frameSize , it default value is
> 10m .
> I execute the JavaWordCount read data from hdfs , there is a 7G file .
> there is a oom error caused by
> some task result exceeded Akka frame size .
> but when I modify the arg 1G ,2G , 10G , it show me
> ERROR ClusterScheduler: Lost executor 1 on ocnosql84: remote Akka client
> shutdown
> 13/12/24 19:41:14 ERROR StandaloneExecutorBackend: Driver terminated or
> disconnected! Shutting down.
>
> Sometimes it show me different error info :
>  [lh1@ocnosql84 src]$ java MyWordCount spark://ocnosql84:7077
> hdfs://ocnosql76:8030/user/lh1/cdr_ismp_20130218 15000 1g 120
> 13/12/24 19:20:33 ERROR Client$ClientActor: Failed to connect to master
> org.jboss.netty.channel.ChannelPipelineException: Failed to initialize a
> pipeline.
>         at
> org.jboss.netty.bootstrap.ClientBootstrap.connect(ClientBootstrap.java:209)
>         at
> org.jboss.netty.bootstrap.ClientBootstrap.connect(ClientBootstrap.java:183)
>         at
> akka.remote.netty.ActiveRemoteClient$$anonfun$connect$1.apply$mcV$sp(Client.scala:173)
>         at akka.util.Switch.liftedTree1$1(LockUtil.scala:33)
>         at akka.util.Switch.transcend(LockUtil.scala:32)
>         at akka.util.Switch.switchOn(LockUtil.scala:55)
>         at akka.remote.netty.ActiveRemoteClient.connect(Client.scala:158)
>         at
> akka.remote.netty.NettyRemoteTransport.send(NettyRemoteSupport.scala:153)
>         at
> akka.remote.RemoteActorRef.$bang(RemoteActorRefProvider.scala:247)
>         at
> org.apache.spark.deploy.client.Client$ClientActor.preStart(Client.scala:61)
>         at akka.actor.ActorCell.create$1(ActorCell.scala:508)
>         at akka.actor.ActorCell.systemInvoke(ActorCell.scala:600)
>         at
> akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:209)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:178)
>         at
> akka.dispatch.ForkJoinExecutorConfigurator$MailboxExecutionTask.exec(AbstractDispatcher.scala:516)
>         at akka.jsr166y.ForkJoinTask.doExec(ForkJoinTask.java:259)
>         at
> akka.jsr166y.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:975)
>         at akka.jsr166y.ForkJoinPool.runWorker(ForkJoinPool.java:1479)
>         at
> akka.jsr166y.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104)
> Caused by: java.lang.IllegalArgumentException: maxFrameLength must be a
> positive integer: -1451229184
>         at
> org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder.<init>(LengthFieldBasedFrameDecoder.java:270)
>         at
> org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder.<init>(LengthFieldBasedFrameDecoder.java:236)
>         at
> akka.remote.netty.ActiveRemoteClientPipelineFactory.getPipeline(Client.scala:340)
>         at
> org.jboss.netty.bootstrap.ClientBootstrap.connect(ClientBootstrap.java:207)
>         ... 18 more
> 13/12/24 19:20:33 ERROR SparkDeploySchedulerBackend: Disconnected from
> Spark cluster!
> 13/12/24 19:20:33 ERROR ClusterScheduler: Exiting due to error from
> cluster scheduler: Disconnected from Spark cluster
>
> It seems caused by
> LengthFieldBasedFrameDecoder lenDec = new
> LengthFieldBasedFrameDecoder(this.client.netty().settings().MessageFrameSize(),
> 0, 4, 0, 4);
> I don't know what's the value of
> this.client.netty().settings().MessageFrameSize() and how  to calculate
> this value .
>
> my spark args :
> export SPARK_DAEMON_MEMORY=4000m
> export SPARK_MEM=1000m
> export SPARK_WORKER_MEMORY=8g
> spark.akka.frameSize = 1000 / 2000 / 5000 / 10000 / 15000
> spark.executor.memory  = 1g
> spark.akka.askTimeout = 120
>
>  Any help or reply is very appriciated !  Thanks very much
>
> ------------------------------
>  leosand...@gmail.com
>

Reply via email to