I'm running a distributed KMeans algorithm with 4 executors. I have a RDD[Data]. I use mapPartition to run a learner on each data partition, and then call reduce with my custom model reduce function to reduce the result of the model to start a new iteration.
The model size is around ~330 MB. I would expect that the required memory for the serialized result at the driver to be at most 2*300 MB in order to reduce two models, but it looks like Spark is serializing all of the models to the driver before reducing. The error message says that the total size of the serialized results is 1345.5MB, which is approximately 4 * 330 MB. I know I can set the driver's max result size, but I just want to confirm that this is expected behavior. Thanks! James Stage 0:==============> (1 + 3) / 4]16/02/19 05:59:28 ERROR TaskSetManager: Total size of serialized results of 4 tasks (1345.5 MB) is bigger than spark.driver.maxResultSize (1024.0 MB) org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 4 tasks (1345.5 MB) is bigger than spark.driver.maxResultSize (1024.0 MB) at org.apache.spark.scheduler.DAGScheduler.org <http://org.apache.spark.scheduler.dagscheduler.org/>$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944) at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1007) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) at org.apache.spark.rdd.RDD.withScope(RDD.scala:310) at org.apache.spark.rdd.RDD.reduce(RDD.scala:989) at BIDMach.RunOnSpark$.runOnSpark(RunOnSpark.scala:50) ... 50 elided