[ https://issues.apache.org/jira/browse/MAHOUT-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Andrew Palumbo updated MAHOUT-1790: ----------------------------------- Sprint: (was: Jan/Feb-2017) > SparkEngine nnz overflow resultSize when reducing. > -------------------------------------------------- > > Key: MAHOUT-1790 > URL: https://issues.apache.org/jira/browse/MAHOUT-1790 > Project: Mahout > Issue Type: Bug > Components: spark > Affects Versions: 0.11.1 > Reporter: Michel Lemay > Assignee: Andrew Palumbo > Priority: Minor > Fix For: 0.13.1 > > > When counting numNonZeroElementsPerColumn in spark engine with large number > of columns, we get the following error: > ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7 > MB) is bigger than spark.driver.maxResultSize (1024.0 MB) > and then, the call stack: > org.apache.spark.SparkException: Job aborted due to stage failure: Total size > of serialized results of 267 tasks (1024.1 MB) is bigger than > spark.driver.maxResultSize (1024.0 MB) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942) > at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:306) > at org.apache.spark.rdd.RDD.reduce(RDD.scala:985) > at > org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:86) > at > org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:37) > at > org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:286) > at > org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:66) > at > org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:141) > This occurs because it uses a DenseVector and spark seemingly aggregate all > of them on the driver before reducing. > I think this could be easily prevented with a treeReduce(_ += _, depth) > instead of a reduce(_ += _) > 'depth' could be computed in function of 'n' and numberOfPartitions.. > something in the line of: > val maxResultSize = .... > val numPartitions = drm.rdd.partitions.size > val n = drm.ncol > val bytesPerVector = n * 8 + overhead? > val maxVectors = maxResultSize / bytes / 2 + 1 // be safe > val depth = math.max(1, math.ceil(math.log(1 + numPartitions / maxVectors) > / math.log(2)).toInt) -- This message was sent by Atlassian JIRA (v6.3.15#6346)