I'm running a distributed KMeans algorithm with 4 executors.

I have a RDD[Data]. I use mapPartition to run a learner on each data
partition, and then call reduce with my custom model reduce function
to reduce the result of the model to start a new iteration.

The model size is around ~330 MB. I would expect that the required
memory for the serialized result at the driver to be at most 2*300 MB
in order to reduce two models, but it looks like Spark is serializing
all of the models to the driver before reducing.

The error message says that the total size of the serialized results
is 1345.5MB, which is approximately 4 * 330 MB. I know I can set the
driver's max result size, but I just want to confirm that this is
expected behavior.

Thanks!

James

Stage 0:==============>                                            (1
+ 3) / 4]16/02/19 05:59:28 ERROR TaskSetManager: Total size of
serialized results of 4 tasks (1345.5 MB) is bigger than
spark.driver.maxResultSize (1024.0 MB)

org.apache.spark.SparkException: Job aborted due to stage failure:
Total size of serialized results of 4 tasks (1345.5 MB) is bigger than
spark.driver.maxResultSize (1024.0 MB)

  at org.apache.spark.scheduler.DAGScheduler.org
<http://org.apache.spark.scheduler.dagscheduler.org/>$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)

  at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)

  at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)

  at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)

  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)

  at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)

  at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)

  at scala.Option.foreach(Option.scala:257)

  at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)

  at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)

  at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)

  at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)

  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)

  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824)

  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)

  at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1007)

  at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)

  at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)

  at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)

  at org.apache.spark.rdd.RDD.reduce(RDD.scala:989)

  at BIDMach.RunOnSpark$.runOnSpark(RunOnSpark.scala:50)

  ... 50 elided

Reply via email to