[ https://issues.apache.org/jira/browse/MAHOUT-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15212117#comment-15212117 ]
Dmitriy Lyubimov edited comment on MAHOUT-1790 at 3/25/16 5:25 PM: ------------------------------------------------------------------- [~smarthi] (1) Actually i don't know why this comment (of mine) got in here. This is a different issue altogether -- it is an issue of a spark setting. if there's a collection of large volume of information to the front end, spark has a setting to limit it at certain number by default (1Gb), apparently in attempt to maintain the robustness of the backend w.r.t. runaway processes and ill programming. If there's a legitimate big collection to the front that exceeds 1G then the solution is just bump up this setting with spark. (-Dspark.max.result or something). (2) I still have a question why would the operation invoked cause such a big collection to the front -- this needs to be investigated. However, this capability, numNonZero etc. was added by either [~ssc] or [~pferrel], so i'd suggest for these folks to take a look at it again. I don't have the first hand knowledge of this logic. If everything else fails, i'll take a look at some point -- but not soon, as it is not a priority for me. was (Author: dlyubimov): [~smarthi] (1) Actually i don't know why this comment (of mine) got in here. This is a different issue altogether -- it is an issue of a spark setting. if there's a collection of large volume of information to the front end, spark has a setting to limit it at certain number by default (1Gb), apparently in attempt to maintain the robustness of the backend w.r.t. runaway processes and ill programming. If there's a legitimate big collection to the front that exceeds 1G then the solution is just bump up this setting with spark. (-Dspark.max.result or something). (2) I still have a question why would the operation invoked cause such a big collection to the front -- this needs to be investigated. However, this capability, numNonZero etc. was added by either [~ssc] or [~pferrel], so i'd suggest for these folks to take a look at it again. I don't know first hand knowledge of this logic. If everything else fails, i'll take a look at some point -- but not soon, as it is not a priority for me. > SparkEngine nnz overflow resultSize when reducing. > -------------------------------------------------- > > Key: MAHOUT-1790 > URL: https://issues.apache.org/jira/browse/MAHOUT-1790 > Project: Mahout > Issue Type: Bug > Components: spark > Affects Versions: 0.11.1 > Reporter: Michel Lemay > Assignee: Dmitriy Lyubimov > Priority: Minor > Fix For: 0.12.0 > > > When counting numNonZeroElementsPerColumn in spark engine with large number > of columns, we get the following error: > ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7 > MB) is bigger than spark.driver.maxResultSize (1024.0 MB) > and then, the call stack: > org.apache.spark.SparkException: Job aborted due to stage failure: Total size > of serialized results of 267 tasks (1024.1 MB) is bigger than > spark.driver.maxResultSize (1024.0 MB) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942) > at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:306) > at org.apache.spark.rdd.RDD.reduce(RDD.scala:985) > at > org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:86) > at > org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:37) > at > org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:286) > at > org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:66) > at > org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:141) > This occurs because it uses a DenseVector and spark seemingly aggregate all > of them on the driver before reducing. > I think this could be easily prevented with a treeReduce(_ += _, depth) > instead of a reduce(_ += _) > 'depth' could be computed in function of 'n' and numberOfPartitions.. > something in the line of: > val maxResultSize = .... > val numPartitions = drm.rdd.partitions.size > val n = drm.ncol > val bytesPerVector = n * 8 + overhead? > val maxVectors = maxResultSize / bytes / 2 + 1 // be safe > val depth = math.max(1, math.ceil(math.log(1 + numPartitions / maxVectors) > / math.log(2)).toInt) -- This message was sent by Atlassian JIRA (v6.3.4#6332)