[jira] [Commented] (MAHOUT-1790) SparkEngine nnz overflow resultSize when reducing.
[ https://issues.apache.org/jira/browse/MAHOUT-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823400#comment-15823400 ] Andrew Palumbo commented on MAHOUT-1790: [~dlyubimov] Do you have any thoughts on this? > SparkEngine nnz overflow resultSize when reducing. > -- > > Key: MAHOUT-1790 > URL: https://issues.apache.org/jira/browse/MAHOUT-1790 > Project: Mahout > Issue Type: Bug > Components: spark >Affects Versions: 0.11.1 >Reporter: Michel Lemay >Assignee: Andrew Palumbo >Priority: Minor > Fix For: 0.13.0 > > > When counting numNonZeroElementsPerColumn in spark engine with large number > of columns, we get the following error: > ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7 > MB) is bigger than spark.driver.maxResultSize (1024.0 MB) > and then, the call stack: > org.apache.spark.SparkException: Job aborted due to stage failure: Total size > of serialized results of 267 tasks (1024.1 MB) is bigger than > spark.driver.maxResultSize (1024.0 MB) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942) > at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:306) > at org.apache.spark.rdd.RDD.reduce(RDD.scala:985) > at > org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:86) > at > org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:37) > at > org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:286) > at > org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:66) > at > org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:141) > This occurs because it uses a DenseVector and spark seemingly aggregate all > of them on the driver before reducing. > I think this could be easily prevented with a treeReduce(_ += _, depth) > instead of a reduce(_ += _) > 'depth' could be computed in function of 'n' and numberOfPartitions.. > something in the line of: > val maxResultSize = > val numPartitions = drm.rdd.partitions.size > val n = drm.ncol > val bytesPerVector = n * 8 + overhead? > val maxVectors = maxResultSize / bytes / 2 + 1 // be safe > val depth = math.max(1, math.ceil(math.log(1 + numPartitions / maxVectors) > / math.log(2)).toInt) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (MAHOUT-1790) SparkEngine nnz overflow resultSize when reducing.
[ https://issues.apache.org/jira/browse/MAHOUT-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15761140#comment-15761140 ] Michel Lemay commented on MAHOUT-1790: -- Right now, we are not using Mahout in production code. We found that matrix operations in distributed clusters with spark tends to be way to slow and prefer to use shared memory approach with bigger machines. > SparkEngine nnz overflow resultSize when reducing. > -- > > Key: MAHOUT-1790 > URL: https://issues.apache.org/jira/browse/MAHOUT-1790 > Project: Mahout > Issue Type: Bug > Components: spark >Affects Versions: 0.11.1 >Reporter: Michel Lemay >Assignee: Dmitriy Lyubimov >Priority: Minor > Fix For: 0.13.0 > > > When counting numNonZeroElementsPerColumn in spark engine with large number > of columns, we get the following error: > ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7 > MB) is bigger than spark.driver.maxResultSize (1024.0 MB) > and then, the call stack: > org.apache.spark.SparkException: Job aborted due to stage failure: Total size > of serialized results of 267 tasks (1024.1 MB) is bigger than > spark.driver.maxResultSize (1024.0 MB) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942) > at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:306) > at org.apache.spark.rdd.RDD.reduce(RDD.scala:985) > at > org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:86) > at > org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:37) > at > org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:286) > at > org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:66) > at > org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:141) > This occurs because it uses a DenseVector and spark seemingly aggregate all > of them on the driver before reducing. > I think this could be easily prevented with a treeReduce(_ += _, depth) > instead of a reduce(_ += _) > 'depth' could be computed in function of 'n' and numberOfPartitions.. > something in the line of: > val maxResultSize = > val numPartitions = drm.rdd.partitions.size > val n = drm.ncol > val bytesPerVector = n * 8 + overhead? > val maxVectors = maxResultSize / bytes / 2 + 1 // be safe > val depth = math.max(1, math.ceil(math.log(1 + numPartitions / maxVectors) > / math.log(2)).toInt) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (MAHOUT-1790) SparkEngine nnz overflow resultSize when reducing.
[ https://issues.apache.org/jira/browse/MAHOUT-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15760250#comment-15760250 ] Andrew Palumbo commented on MAHOUT-1790: What's the status here [~FlamingMike]? > SparkEngine nnz overflow resultSize when reducing. > -- > > Key: MAHOUT-1790 > URL: https://issues.apache.org/jira/browse/MAHOUT-1790 > Project: Mahout > Issue Type: Bug > Components: spark >Affects Versions: 0.11.1 >Reporter: Michel Lemay >Assignee: Dmitriy Lyubimov >Priority: Minor > Fix For: 0.13.0 > > > When counting numNonZeroElementsPerColumn in spark engine with large number > of columns, we get the following error: > ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7 > MB) is bigger than spark.driver.maxResultSize (1024.0 MB) > and then, the call stack: > org.apache.spark.SparkException: Job aborted due to stage failure: Total size > of serialized results of 267 tasks (1024.1 MB) is bigger than > spark.driver.maxResultSize (1024.0 MB) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942) > at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:306) > at org.apache.spark.rdd.RDD.reduce(RDD.scala:985) > at > org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:86) > at > org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:37) > at > org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:286) > at > org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:66) > at > org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:141) > This occurs because it uses a DenseVector and spark seemingly aggregate all > of them on the driver before reducing. > I think this could be easily prevented with a treeReduce(_ += _, depth) > instead of a reduce(_ += _) > 'depth' could be computed in function of 'n' and numberOfPartitions.. > something in the line of: > val maxResultSize = > val numPartitions = drm.rdd.partitions.size > val n = drm.ncol > val bytesPerVector = n * 8 + overhead? > val maxVectors = maxResultSize / bytes / 2 + 1 // be safe > val depth = math.max(1, math.ceil(math.log(1 + numPartitions / maxVectors) > / math.log(2)).toInt) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (MAHOUT-1790) SparkEngine nnz overflow resultSize when reducing.
[ https://issues.apache.org/jira/browse/MAHOUT-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15279990#comment-15279990 ] Michel Lemay commented on MAHOUT-1790: -- Although there is a simple workaround in configuration, I don't think it is a good design choice to close this issue as not-a-bug when such a simple fix can be made to ensure it is robust to this scenario. When computing the logLikelihood of an event (need to compute nnz) in a very large but sparse matrix, you end up crashing spark. Spark driver should never have to deal with very large collection of data. The whole design of map-reduce is to prevent such wasteful aggregation of data on a single node. It's your call but I don't agree with your resolution. > SparkEngine nnz overflow resultSize when reducing. > -- > > Key: MAHOUT-1790 > URL: https://issues.apache.org/jira/browse/MAHOUT-1790 > Project: Mahout > Issue Type: Bug > Components: spark >Affects Versions: 0.11.1 >Reporter: Michel Lemay >Assignee: Dmitriy Lyubimov >Priority: Minor > Fix For: 1.0.0 > > > When counting numNonZeroElementsPerColumn in spark engine with large number > of columns, we get the following error: > ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7 > MB) is bigger than spark.driver.maxResultSize (1024.0 MB) > and then, the call stack: > org.apache.spark.SparkException: Job aborted due to stage failure: Total size > of serialized results of 267 tasks (1024.1 MB) is bigger than > spark.driver.maxResultSize (1024.0 MB) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942) > at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:306) > at org.apache.spark.rdd.RDD.reduce(RDD.scala:985) > at > org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:86) > at > org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:37) > at > org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:286) > at > org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:66) > at > org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:141) > This occurs because it uses a DenseVector and spark seemingly aggregate all > of them on the driver before reducing. > I think this could be easily prevented with a treeReduce(_ += _, depth) > instead of a reduce(_ += _) > 'depth' could be computed in function of 'n' and numberOfPartitions.. > something in the line of: > val maxResultSize = > val numPartitions = drm.rdd.partitions.size > val n = drm.ncol > val bytesPerVector = n * 8 + overhead? > val maxVectors = maxResultSize / bytes / 2 + 1 // be safe > val depth = math.max(1, math.ceil(math.log(1 + numPartitions / maxVectors) > / math.log(2)).toInt)
[jira] [Commented] (MAHOUT-1790) SparkEngine nnz overflow resultSize when reducing.
[ https://issues.apache.org/jira/browse/MAHOUT-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15277548#comment-15277548 ] Andrew Palumbo commented on MAHOUT-1790: [~dlyubimov], [~pferrel] is there something to do here? > SparkEngine nnz overflow resultSize when reducing. > -- > > Key: MAHOUT-1790 > URL: https://issues.apache.org/jira/browse/MAHOUT-1790 > Project: Mahout > Issue Type: Bug > Components: spark >Affects Versions: 0.11.1 >Reporter: Michel Lemay >Assignee: Dmitriy Lyubimov >Priority: Minor > Fix For: 0.12.1 > > > When counting numNonZeroElementsPerColumn in spark engine with large number > of columns, we get the following error: > ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7 > MB) is bigger than spark.driver.maxResultSize (1024.0 MB) > and then, the call stack: > org.apache.spark.SparkException: Job aborted due to stage failure: Total size > of serialized results of 267 tasks (1024.1 MB) is bigger than > spark.driver.maxResultSize (1024.0 MB) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942) > at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:306) > at org.apache.spark.rdd.RDD.reduce(RDD.scala:985) > at > org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:86) > at > org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:37) > at > org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:286) > at > org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:66) > at > org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:141) > This occurs because it uses a DenseVector and spark seemingly aggregate all > of them on the driver before reducing. > I think this could be easily prevented with a treeReduce(_ += _, depth) > instead of a reduce(_ += _) > 'depth' could be computed in function of 'n' and numberOfPartitions.. > something in the line of: > val maxResultSize = > val numPartitions = drm.rdd.partitions.size > val n = drm.ncol > val bytesPerVector = n * 8 + overhead? > val maxVectors = maxResultSize / bytes / 2 + 1 // be safe > val depth = math.max(1, math.ceil(math.log(1 + numPartitions / maxVectors) > / math.log(2)).toInt) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (MAHOUT-1790) SparkEngine nnz overflow resultSize when reducing.
[ https://issues.apache.org/jira/browse/MAHOUT-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15269259#comment-15269259 ] Andrew Palumbo commented on MAHOUT-1790: Is this a valid issue? > SparkEngine nnz overflow resultSize when reducing. > -- > > Key: MAHOUT-1790 > URL: https://issues.apache.org/jira/browse/MAHOUT-1790 > Project: Mahout > Issue Type: Bug > Components: spark >Affects Versions: 0.11.1 >Reporter: Michel Lemay >Assignee: Dmitriy Lyubimov >Priority: Minor > Fix For: 0.12.1 > > > When counting numNonZeroElementsPerColumn in spark engine with large number > of columns, we get the following error: > ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7 > MB) is bigger than spark.driver.maxResultSize (1024.0 MB) > and then, the call stack: > org.apache.spark.SparkException: Job aborted due to stage failure: Total size > of serialized results of 267 tasks (1024.1 MB) is bigger than > spark.driver.maxResultSize (1024.0 MB) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942) > at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:306) > at org.apache.spark.rdd.RDD.reduce(RDD.scala:985) > at > org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:86) > at > org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:37) > at > org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:286) > at > org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:66) > at > org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:141) > This occurs because it uses a DenseVector and spark seemingly aggregate all > of them on the driver before reducing. > I think this could be easily prevented with a treeReduce(_ += _, depth) > instead of a reduce(_ += _) > 'depth' could be computed in function of 'n' and numberOfPartitions.. > something in the line of: > val maxResultSize = > val numPartitions = drm.rdd.partitions.size > val n = drm.ncol > val bytesPerVector = n * 8 + overhead? > val maxVectors = maxResultSize / bytes / 2 + 1 // be safe > val depth = math.max(1, math.ceil(math.log(1 + numPartitions / maxVectors) > / math.log(2)).toInt) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (MAHOUT-1790) SparkEngine nnz overflow resultSize when reducing.
[ https://issues.apache.org/jira/browse/MAHOUT-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15212117#comment-15212117 ] Dmitriy Lyubimov commented on MAHOUT-1790: -- [~smarthi] (1) Actually i don't know why this comment (of mine) got in here. This is a different issue altogether -- it is an issue of a spark setting. if there's a collection of large volume of information to the front end, spark has a setting to limit it at certain number by default (1Gb), apparently in attempt to maintain the robustness of the backend w.r.t. runaway processes and ill programming. If there's a legitimate big collection to the front that exceeds 1G then the solution is just bump up this setting with spark. (-Dspark.max.result or something). (2) I still have a question why would the operation invoked cause such a big collection to the front -- this needs to be investigated. However, this capability, numNonZero etc. was added by either [~ssc] or [~pferrel], so i'd suggest for these folks to take a look at it again. I don't know first hand knowledge of this logic. If everything else fails, i'll take a look at some point -- but not soon, as it is not a priority for me. > SparkEngine nnz overflow resultSize when reducing. > -- > > Key: MAHOUT-1790 > URL: https://issues.apache.org/jira/browse/MAHOUT-1790 > Project: Mahout > Issue Type: Bug > Components: spark >Affects Versions: 0.11.1 >Reporter: Michel Lemay >Assignee: Dmitriy Lyubimov >Priority: Minor > Fix For: 0.12.0 > > > When counting numNonZeroElementsPerColumn in spark engine with large number > of columns, we get the following error: > ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7 > MB) is bigger than spark.driver.maxResultSize (1024.0 MB) > and then, the call stack: > org.apache.spark.SparkException: Job aborted due to stage failure: Total size > of serialized results of 267 tasks (1024.1 MB) is bigger than > spark.driver.maxResultSize (1024.0 MB) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942) > at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:306) > at org.apache.spark.rdd.RDD.reduce(RDD.scala:985) > at > org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:86) > at > org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:37) > at > org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:286) > at > org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:66) > at > org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:141) > This occurs because it uses a DenseVector and spark seemingly aggregate all > of them on the driver before reducing. > I think this could be easily prevented with a treeReduce(_ +=
[jira] [Commented] (MAHOUT-1790) SparkEngine nnz overflow resultSize when reducing.
[ https://issues.apache.org/jira/browse/MAHOUT-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15194674#comment-15194674 ] Suneel Marthi commented on MAHOUT-1790: --- any updates on this? > SparkEngine nnz overflow resultSize when reducing. > -- > > Key: MAHOUT-1790 > URL: https://issues.apache.org/jira/browse/MAHOUT-1790 > Project: Mahout > Issue Type: Bug > Components: spark >Affects Versions: 0.11.1 >Reporter: Michel Lemay >Assignee: Dmitriy Lyubimov >Priority: Minor > Fix For: 0.12.0 > > > When counting numNonZeroElementsPerColumn in spark engine with large number > of columns, we get the following error: > ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7 > MB) is bigger than spark.driver.maxResultSize (1024.0 MB) > and then, the call stack: > org.apache.spark.SparkException: Job aborted due to stage failure: Total size > of serialized results of 267 tasks (1024.1 MB) is bigger than > spark.driver.maxResultSize (1024.0 MB) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942) > at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:306) > at org.apache.spark.rdd.RDD.reduce(RDD.scala:985) > at > org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:86) > at > org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:37) > at > org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:286) > at > org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:66) > at > org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:141) > This occurs because it uses a DenseVector and spark seemingly aggregate all > of them on the driver before reducing. > I think this could be easily prevented with a treeReduce(_ += _, depth) > instead of a reduce(_ += _) > 'depth' could be computed in function of 'n' and numberOfPartitions.. > something in the line of: > val maxResultSize = > val numPartitions = drm.rdd.partitions.size > val n = drm.ncol > val bytesPerVector = n * 8 + overhead? > val maxVectors = maxResultSize / bytes / 2 + 1 // be safe > val depth = math.max(1, math.ceil(math.log(1 + numPartitions / maxVectors) > / math.log(2)).toInt) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (MAHOUT-1790) SparkEngine nnz overflow resultSize when reducing.
[ https://issues.apache.org/jira/browse/MAHOUT-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15005677#comment-15005677 ] Dmitriy Lyubimov commented on MAHOUT-1790: -- +1. treeReduce() is a new api appeared in spark 1.3. The only complaint about this api i heard was "why they didn't do it in the first place". And they may have taken care of the optimization of the depth too. Semantically equivalent to reduce(). which means this fix is applicable to Mahout 0.11.x + branches. [~FlamingMike] in the spirit of the Apache "power of do", can you try to suggest the patch? > SparkEngine nnz overflow resultSize when reducing. > -- > > Key: MAHOUT-1790 > URL: https://issues.apache.org/jira/browse/MAHOUT-1790 > Project: Mahout > Issue Type: Bug > Components: spark >Affects Versions: 0.11.1 >Reporter: Michel Lemay >Priority: Minor > > When counting numNonZeroElementsPerColumn in spark engine with large number > of columns, we get the following error: > ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7 > MB) is bigger than spark.driver.maxResultSize (1024.0 MB) > and then, the call stack: > org.apache.spark.SparkException: Job aborted due to stage failure: Total size > of serialized results of 267 tasks (1024.1 MB) is bigger than > spark.driver.maxResultSize (1024.0 MB) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942) > at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:306) > at org.apache.spark.rdd.RDD.reduce(RDD.scala:985) > at > org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:86) > at > org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:37) > at > org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:286) > at > org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:66) > at > org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:141) > This occurs because it uses a DenseVector and spark seemingly aggregate all > of them on the driver before reducing. > I think this could be easily prevented with a treeReduce(_ += _, depth) > instead of a reduce(_ += _) > 'depth' could be computed in function of 'n' and numberOfPartitions.. > something in the line of: > val maxResultSize = > val numPartitions = drm.rdd.partitions.size > val n = drm.ncol > val bytesPerVector = n * 8 + overhead? > val maxVectors = maxResultSize / bytes / 2 + 1 // be safe > val depth = math.max(1, math.ceil(math.log(1 + numPartitions / maxVectors) > / math.log(2)).toInt) -- This message was sent by Atlassian JIRA (v6.3.4#6332)