Hi,

Thanks for the prompt reply.
I checked the code. The main issue is the large number of mappers. If the
number of mappers is set to some number around 1000, there will be no
problem. I hope the bug gets fixed in the next releases.

On Mon, Jan 5, 2015 at 1:26 AM, Josh Rosen <rosenvi...@gmail.com> wrote:

> Ah, so I guess this *is* still an issue since we needed to use a bitmap
> for tracking zero-sized blocks (see
> https://issues.apache.org/jira/browse/SPARK-3740; this isn't just a
> performance issue; it's necessary for correctness).  This will require a
> bit more effort to fix, since we'll either have to find a way to use a
> fixed size / capped size encoding for MapOutputStatuses (which might
> require changes to let us fetch empty blocks safely) or figure out some
> other strategy for shipping these statues.
>
> I've filed https://issues.apache.org/jira/browse/SPARK-5077 to try to
> come up with a proper fix.  In the meantime, I recommend that you increase
> your Akka frame size.
>
> On Sat, Jan 3, 2015 at 8:51 PM, Saeed Shahrivari <
> saeed.shahriv...@gmail.com> wrote:
>
>> I use the 1.2 version.
>>
>> On Sun, Jan 4, 2015 at 3:01 AM, Josh Rosen <rosenvi...@gmail.com> wrote:
>>
>>> Which version of Spark are you using?  It seems like the issue here is
>>> that the map output statuses are too large to fit in the Akka frame size.
>>> This issue has been fixed in Spark 1.2 by using a different encoding for
>>> map outputs for jobs with many reducers (
>>> https://issues.apache.org/jira/browse/SPARK-3613).  On earlier Spark
>>> versions, your options are either reducing the number of reducers (e.g. by
>>> explicitly specifying the number of reducers in the reduceByKey() call)
>>> or increasing the Akka frame size (via the spark.akka.frameSize
>>> configuration option).
>>>
>>> On Sat, Jan 3, 2015 at 10:40 AM, Saeed Shahrivari <
>>> saeed.shahriv...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am trying to get the frequency of each Unicode char in a document
>>>> collection using Spark. Here is the code snippet that does the job:
>>>>
>>>>         JavaPairRDD<LongWritable, Text> rows = sc.sequenceFile(args[0],
>>>> LongWritable.class, Text.class);
>>>>         rows = rows.coalesce(10000);
>>>>
>>>>         JavaPairRDD<Character,Long> pairs = rows.flatMapToPair(t -> {
>>>>             String content=t._2.toString();
>>>>             Multiset<Character> chars= HashMultiset.create();
>>>>             for(int i=0;i<content.length();i++)
>>>>                 chars.add(content.charAt(i));
>>>>             List&lt;Tuple2&lt;Character,Long>> list=new
>>>> ArrayList<Tuple2&lt;Character, Long>>();
>>>>             for(Character ch:chars.elementSet()){
>>>>                 list.add(new
>>>> Tuple2<Character,Long>(ch,(long)chars.count(ch)));
>>>>             }
>>>>             return list;
>>>>         });
>>>>
>>>>         JavaPairRDD<Character, Long> counts = pairs.reduceByKey((a, b)
>>>> -> a
>>>> + b);
>>>>         System.out.printf("MapCount %,d\n",counts.count());
>>>>
>>>> But, I get the following exception:
>>>>
>>>> 15/01/03 21:51:34 ERROR MapOutputTrackerMasterActor: Map output statuses
>>>> were 11141547 bytes which exceeds spark.akka.frameSize (10485760 bytes).
>>>> org.apache.spark.SparkException: Map output statuses were 11141547 bytes
>>>> which exceeds spark.akka.frameSize (10485760 bytes).
>>>>         at
>>>>
>>>> org.apache.spark.MapOutputTrackerMasterActor$$anonfun$receiveWithLogging$1.applyOrElse(MapOutputTracker.scala:59)
>>>>         at
>>>>
>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>>>>         at
>>>>
>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>>>>         at
>>>>
>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>>>>         at
>>>>
>>>> org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:53)
>>>>         at
>>>>
>>>> org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
>>>>         at
>>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>>>>         at
>>>>
>>>> org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
>>>>         at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>>>>         at
>>>>
>>>> org.apache.spark.MapOutputTrackerMasterActor.aroundReceive(MapOutputTracker.scala:42)
>>>>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>>>         at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>>>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>>>>         at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>>>>         at
>>>>
>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
>>>>         at
>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>         at
>>>>
>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>         at
>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>         at
>>>>
>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>
>>>> Would you please tell me where is the fault?
>>>> If I process fewer rows, there is no problem. However, when the number
>>>> of
>>>> rows is large I always get this exception.
>>>>
>>>> Thanks beforehand.
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/spark-akka-frameSize-limit-error-tp20955.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>>>
>>
>

Reply via email to