The error you're receiving is because the Akka frame size must be a positive Java Integer, i.e., less than 2^31. However, the frame size is not intended to be nearly the size of the job memory -- it is the smallest unit of data transfer that Spark does. In this case, your "task result" size is exceeding 10MB, which means that returning the results for a single partition of your data is >10MB.
It appears that the default JavaWordCount example has a minSplits value of 1 (ctx.textFile(args[1], 1)). This really means that the number of partitions will be max(1, # hdfs blocks in file). If you have an HDFS block of size ~64MB and all distinct words, the resulting task set may be around the same size, which is >10MB. You have two collaborating solutions: 1. Increase the value of minSplits to reduce the size of any single TaskResult set, like: ctx.textFile(args[1], 256) 2. Increase the Akka frame size by a small amount (e.g., to 20-70MB). Please note that this issue, while annoying, is in good part due to the lack of realism of this example. You very rarely call collect() in Spark in actual usage, as that will put *all *your output data on the driver machine. Much more likely you'd save to an HDFS file or compute the top 100 words or something like that, which would not have this problem. (One final note about your configuration, the Spark Worker is simply responsible for spawning Executors, which do the actual computation. As such, it is typical not to change the Worker memory at all [as it needs very little] but rather to give the majority of a machine's memory distributed amongst the Executors. If each machine has 16 GB of RAM and 4 cores, for example, you might set spark.executor.memory between 2 and 3 GB, totaling 8-12 GB used by Spark.) On Tue, Dec 24, 2013 at 3:58 AM, leosand...@gmail.com <leosand...@gmail.com>wrote: > Hi, everyone > > I have a question about the arg spark.akka.frameSize , it default value is > 10m . > I execute the JavaWordCount read data from hdfs , there is a 7G file . > there is a oom error caused by > some task result exceeded Akka frame size . > but when I modify the arg 1G ,2G , 10G , it show me > ERROR ClusterScheduler: Lost executor 1 on ocnosql84: remote Akka client > shutdown > 13/12/24 19:41:14 ERROR StandaloneExecutorBackend: Driver terminated or > disconnected! Shutting down. > > Sometimes it show me different error info : > [lh1@ocnosql84 src]$ java MyWordCount spark://ocnosql84:7077 > hdfs://ocnosql76:8030/user/lh1/cdr_ismp_20130218 15000 1g 120 > 13/12/24 19:20:33 ERROR Client$ClientActor: Failed to connect to master > org.jboss.netty.channel.ChannelPipelineException: Failed to initialize a > pipeline. > at > org.jboss.netty.bootstrap.ClientBootstrap.connect(ClientBootstrap.java:209) > at > org.jboss.netty.bootstrap.ClientBootstrap.connect(ClientBootstrap.java:183) > at > akka.remote.netty.ActiveRemoteClient$$anonfun$connect$1.apply$mcV$sp(Client.scala:173) > at akka.util.Switch.liftedTree1$1(LockUtil.scala:33) > at akka.util.Switch.transcend(LockUtil.scala:32) > at akka.util.Switch.switchOn(LockUtil.scala:55) > at akka.remote.netty.ActiveRemoteClient.connect(Client.scala:158) > at > akka.remote.netty.NettyRemoteTransport.send(NettyRemoteSupport.scala:153) > at > akka.remote.RemoteActorRef.$bang(RemoteActorRefProvider.scala:247) > at > org.apache.spark.deploy.client.Client$ClientActor.preStart(Client.scala:61) > at akka.actor.ActorCell.create$1(ActorCell.scala:508) > at akka.actor.ActorCell.systemInvoke(ActorCell.scala:600) > at > akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:209) > at akka.dispatch.Mailbox.run(Mailbox.scala:178) > at > akka.dispatch.ForkJoinExecutorConfigurator$MailboxExecutionTask.exec(AbstractDispatcher.scala:516) > at akka.jsr166y.ForkJoinTask.doExec(ForkJoinTask.java:259) > at > akka.jsr166y.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:975) > at akka.jsr166y.ForkJoinPool.runWorker(ForkJoinPool.java:1479) > at > akka.jsr166y.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104) > Caused by: java.lang.IllegalArgumentException: maxFrameLength must be a > positive integer: -1451229184 > at > org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder.<init>(LengthFieldBasedFrameDecoder.java:270) > at > org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder.<init>(LengthFieldBasedFrameDecoder.java:236) > at > akka.remote.netty.ActiveRemoteClientPipelineFactory.getPipeline(Client.scala:340) > at > org.jboss.netty.bootstrap.ClientBootstrap.connect(ClientBootstrap.java:207) > ... 18 more > 13/12/24 19:20:33 ERROR SparkDeploySchedulerBackend: Disconnected from > Spark cluster! > 13/12/24 19:20:33 ERROR ClusterScheduler: Exiting due to error from > cluster scheduler: Disconnected from Spark cluster > > It seems caused by > LengthFieldBasedFrameDecoder lenDec = new > LengthFieldBasedFrameDecoder(this.client.netty().settings().MessageFrameSize(), > 0, 4, 0, 4); > I don't know what's the value of > this.client.netty().settings().MessageFrameSize() and how to calculate > this value . > > my spark args : > export SPARK_DAEMON_MEMORY=4000m > export SPARK_MEM=1000m > export SPARK_WORKER_MEMORY=8g > spark.akka.frameSize = 1000 / 2000 / 5000 / 10000 / 15000 > spark.executor.memory = 1g > spark.akka.askTimeout = 120 > > Any help or reply is very appriciated ! Thanks very much > > ------------------------------ > leosand...@gmail.com >