[jira] [Commented] (MAHOUT-1790) SparkEngine nnz overflow resultSize when reducing.

2017-01-15 Thread Andrew Palumbo (JIRA)

[ 
https://issues.apache.org/jira/browse/MAHOUT-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823400#comment-15823400
 ] 

Andrew Palumbo commented on MAHOUT-1790:


[~dlyubimov] Do you have any thoughts on this?  

> SparkEngine nnz overflow resultSize when reducing.
> --
>
> Key: MAHOUT-1790
> URL: https://issues.apache.org/jira/browse/MAHOUT-1790
> Project: Mahout
>  Issue Type: Bug
>  Components: spark
>Affects Versions: 0.11.1
>Reporter: Michel Lemay
>Assignee: Andrew Palumbo
>Priority: Minor
> Fix For: 0.13.0
>
>
> When counting numNonZeroElementsPerColumn in spark engine with large number 
> of columns, we get the following error:
> ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7 
> MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
> and then, the call stack:
> org.apache.spark.SparkException: Job aborted due to stage failure: Total size 
> of serialized results of 267 tasks (1024.1 MB) is bigger than 
> spark.driver.maxResultSize (1024.0 MB)
> at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
> at scala.Option.foreach(Option.scala:236)
> at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> at 
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942)
> at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003)
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
> at org.apache.spark.rdd.RDD.reduce(RDD.scala:985)
> at 
> org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:86)
> at 
> org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:37)
> at 
> org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:286)
> at 
> org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:66)
> at 
> org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:141)
> This occurs because it uses a DenseVector and spark seemingly aggregate all 
> of them on the driver before reducing.  
> I think this could be easily prevented with a treeReduce(_ += _, depth)  
> instead of a reduce(_ += _)
> 'depth' could be computed in function of 'n' and numberOfPartitions.. 
> something in the line of:
>   val maxResultSize = 
>   val numPartitions = drm.rdd.partitions.size
>   val n = drm.ncol
>   val bytesPerVector = n * 8 + overhead?
>   val maxVectors = maxResultSize / bytes / 2 + 1 // be safe
>   val depth = math.max(1, math.ceil(math.log(1 + numPartitions / maxVectors) 
> / math.log(2)).toInt)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (MAHOUT-1790) SparkEngine nnz overflow resultSize when reducing.

2016-12-19 Thread Michel Lemay (JIRA)

[ 
https://issues.apache.org/jira/browse/MAHOUT-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15761140#comment-15761140
 ] 

Michel Lemay commented on MAHOUT-1790:
--

Right now, we are not using Mahout in production code. We found that matrix 
operations in distributed clusters with spark tends to be way to slow and 
prefer to use shared memory approach with bigger machines.

> SparkEngine nnz overflow resultSize when reducing.
> --
>
> Key: MAHOUT-1790
> URL: https://issues.apache.org/jira/browse/MAHOUT-1790
> Project: Mahout
>  Issue Type: Bug
>  Components: spark
>Affects Versions: 0.11.1
>Reporter: Michel Lemay
>Assignee: Dmitriy Lyubimov
>Priority: Minor
> Fix For: 0.13.0
>
>
> When counting numNonZeroElementsPerColumn in spark engine with large number 
> of columns, we get the following error:
> ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7 
> MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
> and then, the call stack:
> org.apache.spark.SparkException: Job aborted due to stage failure: Total size 
> of serialized results of 267 tasks (1024.1 MB) is bigger than 
> spark.driver.maxResultSize (1024.0 MB)
> at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
> at scala.Option.foreach(Option.scala:236)
> at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> at 
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942)
> at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003)
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
> at org.apache.spark.rdd.RDD.reduce(RDD.scala:985)
> at 
> org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:86)
> at 
> org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:37)
> at 
> org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:286)
> at 
> org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:66)
> at 
> org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:141)
> This occurs because it uses a DenseVector and spark seemingly aggregate all 
> of them on the driver before reducing.  
> I think this could be easily prevented with a treeReduce(_ += _, depth)  
> instead of a reduce(_ += _)
> 'depth' could be computed in function of 'n' and numberOfPartitions.. 
> something in the line of:
>   val maxResultSize = 
>   val numPartitions = drm.rdd.partitions.size
>   val n = drm.ncol
>   val bytesPerVector = n * 8 + overhead?
>   val maxVectors = maxResultSize / bytes / 2 + 1 // be safe
>   val depth = math.max(1, math.ceil(math.log(1 + numPartitions / maxVectors) 
> / math.log(2)).toInt)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (MAHOUT-1790) SparkEngine nnz overflow resultSize when reducing.

2016-12-18 Thread Andrew Palumbo (JIRA)

[ 
https://issues.apache.org/jira/browse/MAHOUT-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15760250#comment-15760250
 ] 

Andrew Palumbo commented on MAHOUT-1790:


What's the status here [~FlamingMike]?

> SparkEngine nnz overflow resultSize when reducing.
> --
>
> Key: MAHOUT-1790
> URL: https://issues.apache.org/jira/browse/MAHOUT-1790
> Project: Mahout
>  Issue Type: Bug
>  Components: spark
>Affects Versions: 0.11.1
>Reporter: Michel Lemay
>Assignee: Dmitriy Lyubimov
>Priority: Minor
> Fix For: 0.13.0
>
>
> When counting numNonZeroElementsPerColumn in spark engine with large number 
> of columns, we get the following error:
> ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7 
> MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
> and then, the call stack:
> org.apache.spark.SparkException: Job aborted due to stage failure: Total size 
> of serialized results of 267 tasks (1024.1 MB) is bigger than 
> spark.driver.maxResultSize (1024.0 MB)
> at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
> at scala.Option.foreach(Option.scala:236)
> at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> at 
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942)
> at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003)
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
> at org.apache.spark.rdd.RDD.reduce(RDD.scala:985)
> at 
> org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:86)
> at 
> org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:37)
> at 
> org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:286)
> at 
> org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:66)
> at 
> org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:141)
> This occurs because it uses a DenseVector and spark seemingly aggregate all 
> of them on the driver before reducing.  
> I think this could be easily prevented with a treeReduce(_ += _, depth)  
> instead of a reduce(_ += _)
> 'depth' could be computed in function of 'n' and numberOfPartitions.. 
> something in the line of:
>   val maxResultSize = 
>   val numPartitions = drm.rdd.partitions.size
>   val n = drm.ncol
>   val bytesPerVector = n * 8 + overhead?
>   val maxVectors = maxResultSize / bytes / 2 + 1 // be safe
>   val depth = math.max(1, math.ceil(math.log(1 + numPartitions / maxVectors) 
> / math.log(2)).toInt)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (MAHOUT-1790) SparkEngine nnz overflow resultSize when reducing.

2016-05-11 Thread Michel Lemay (JIRA)

[ 
https://issues.apache.org/jira/browse/MAHOUT-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15279990#comment-15279990
 ] 

Michel Lemay commented on MAHOUT-1790:
--

Although there is a simple workaround in configuration, I don't think it is a 
good design choice to close this issue as not-a-bug when such a simple fix can 
be made to ensure it is robust to this scenario.  

When computing the logLikelihood of an event (need to compute nnz) in a very 
large but sparse matrix, you end up crashing spark.  Spark driver should never 
have to deal with very large collection of data. The whole design of map-reduce 
is to prevent such wasteful aggregation of data on a single node.

It's your call but I don't agree with your resolution.



> SparkEngine nnz overflow resultSize when reducing.
> --
>
> Key: MAHOUT-1790
> URL: https://issues.apache.org/jira/browse/MAHOUT-1790
> Project: Mahout
>  Issue Type: Bug
>  Components: spark
>Affects Versions: 0.11.1
>Reporter: Michel Lemay
>Assignee: Dmitriy Lyubimov
>Priority: Minor
> Fix For: 1.0.0
>
>
> When counting numNonZeroElementsPerColumn in spark engine with large number 
> of columns, we get the following error:
> ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7 
> MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
> and then, the call stack:
> org.apache.spark.SparkException: Job aborted due to stage failure: Total size 
> of serialized results of 267 tasks (1024.1 MB) is bigger than 
> spark.driver.maxResultSize (1024.0 MB)
> at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
> at scala.Option.foreach(Option.scala:236)
> at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> at 
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942)
> at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003)
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
> at org.apache.spark.rdd.RDD.reduce(RDD.scala:985)
> at 
> org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:86)
> at 
> org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:37)
> at 
> org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:286)
> at 
> org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:66)
> at 
> org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:141)
> This occurs because it uses a DenseVector and spark seemingly aggregate all 
> of them on the driver before reducing.  
> I think this could be easily prevented with a treeReduce(_ += _, depth)  
> instead of a reduce(_ += _)
> 'depth' could be computed in function of 'n' and numberOfPartitions.. 
> something in the line of:
>   val maxResultSize = 
>   val numPartitions = drm.rdd.partitions.size
>   val n = drm.ncol
>   val bytesPerVector = n * 8 + overhead?
>   val maxVectors = maxResultSize / bytes / 2 + 1 // be safe
>   val depth = math.max(1, math.ceil(math.log(1 + numPartitions / maxVectors) 
> / math.log(2)).toInt)




[jira] [Commented] (MAHOUT-1790) SparkEngine nnz overflow resultSize when reducing.

2016-05-09 Thread Andrew Palumbo (JIRA)

[ 
https://issues.apache.org/jira/browse/MAHOUT-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15277548#comment-15277548
 ] 

Andrew Palumbo commented on MAHOUT-1790:


[~dlyubimov], [~pferrel] is there something to do here?

> SparkEngine nnz overflow resultSize when reducing.
> --
>
> Key: MAHOUT-1790
> URL: https://issues.apache.org/jira/browse/MAHOUT-1790
> Project: Mahout
>  Issue Type: Bug
>  Components: spark
>Affects Versions: 0.11.1
>Reporter: Michel Lemay
>Assignee: Dmitriy Lyubimov
>Priority: Minor
> Fix For: 0.12.1
>
>
> When counting numNonZeroElementsPerColumn in spark engine with large number 
> of columns, we get the following error:
> ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7 
> MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
> and then, the call stack:
> org.apache.spark.SparkException: Job aborted due to stage failure: Total size 
> of serialized results of 267 tasks (1024.1 MB) is bigger than 
> spark.driver.maxResultSize (1024.0 MB)
> at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
> at scala.Option.foreach(Option.scala:236)
> at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> at 
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942)
> at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003)
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
> at org.apache.spark.rdd.RDD.reduce(RDD.scala:985)
> at 
> org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:86)
> at 
> org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:37)
> at 
> org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:286)
> at 
> org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:66)
> at 
> org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:141)
> This occurs because it uses a DenseVector and spark seemingly aggregate all 
> of them on the driver before reducing.  
> I think this could be easily prevented with a treeReduce(_ += _, depth)  
> instead of a reduce(_ += _)
> 'depth' could be computed in function of 'n' and numberOfPartitions.. 
> something in the line of:
>   val maxResultSize = 
>   val numPartitions = drm.rdd.partitions.size
>   val n = drm.ncol
>   val bytesPerVector = n * 8 + overhead?
>   val maxVectors = maxResultSize / bytes / 2 + 1 // be safe
>   val depth = math.max(1, math.ceil(math.log(1 + numPartitions / maxVectors) 
> / math.log(2)).toInt)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (MAHOUT-1790) SparkEngine nnz overflow resultSize when reducing.

2016-05-03 Thread Andrew Palumbo (JIRA)

[ 
https://issues.apache.org/jira/browse/MAHOUT-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15269259#comment-15269259
 ] 

Andrew Palumbo commented on MAHOUT-1790:


Is this a valid issue?

> SparkEngine nnz overflow resultSize when reducing.
> --
>
> Key: MAHOUT-1790
> URL: https://issues.apache.org/jira/browse/MAHOUT-1790
> Project: Mahout
>  Issue Type: Bug
>  Components: spark
>Affects Versions: 0.11.1
>Reporter: Michel Lemay
>Assignee: Dmitriy Lyubimov
>Priority: Minor
> Fix For: 0.12.1
>
>
> When counting numNonZeroElementsPerColumn in spark engine with large number 
> of columns, we get the following error:
> ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7 
> MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
> and then, the call stack:
> org.apache.spark.SparkException: Job aborted due to stage failure: Total size 
> of serialized results of 267 tasks (1024.1 MB) is bigger than 
> spark.driver.maxResultSize (1024.0 MB)
> at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
> at scala.Option.foreach(Option.scala:236)
> at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> at 
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942)
> at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003)
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
> at org.apache.spark.rdd.RDD.reduce(RDD.scala:985)
> at 
> org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:86)
> at 
> org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:37)
> at 
> org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:286)
> at 
> org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:66)
> at 
> org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:141)
> This occurs because it uses a DenseVector and spark seemingly aggregate all 
> of them on the driver before reducing.  
> I think this could be easily prevented with a treeReduce(_ += _, depth)  
> instead of a reduce(_ += _)
> 'depth' could be computed in function of 'n' and numberOfPartitions.. 
> something in the line of:
>   val maxResultSize = 
>   val numPartitions = drm.rdd.partitions.size
>   val n = drm.ncol
>   val bytesPerVector = n * 8 + overhead?
>   val maxVectors = maxResultSize / bytes / 2 + 1 // be safe
>   val depth = math.max(1, math.ceil(math.log(1 + numPartitions / maxVectors) 
> / math.log(2)).toInt)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (MAHOUT-1790) SparkEngine nnz overflow resultSize when reducing.

2016-03-25 Thread Dmitriy Lyubimov (JIRA)

[ 
https://issues.apache.org/jira/browse/MAHOUT-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15212117#comment-15212117
 ] 

Dmitriy Lyubimov commented on MAHOUT-1790:
--

[~smarthi]
(1) Actually i don't know why this comment (of mine) got in here. This is a 
different issue altogether -- it is an issue of a spark setting. if there's a 
collection of large volume of information to the front end, spark has a setting 
to limit it at certain number by default (1Gb), apparently in attempt to 
maintain the robustness of the backend w.r.t. runaway processes and ill 
programming. If there's a legitimate big collection to the front that exceeds 
1G then the solution is just bump up this setting with spark. 
(-Dspark.max.result or something).

(2) I still have a question why would the operation invoked cause such a big 
collection to the front -- this needs to be investigated. However, this 
capability, numNonZero etc. was added by either [~ssc] or [~pferrel], so i'd 
suggest for these folks to take a look at it again. I don't know first hand 
knowledge of this logic. 

If everything else fails, i'll take a look at some point -- but not soon, as it 
is not a priority for me.






> SparkEngine nnz overflow resultSize when reducing.
> --
>
> Key: MAHOUT-1790
> URL: https://issues.apache.org/jira/browse/MAHOUT-1790
> Project: Mahout
>  Issue Type: Bug
>  Components: spark
>Affects Versions: 0.11.1
>Reporter: Michel Lemay
>Assignee: Dmitriy Lyubimov
>Priority: Minor
> Fix For: 0.12.0
>
>
> When counting numNonZeroElementsPerColumn in spark engine with large number 
> of columns, we get the following error:
> ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7 
> MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
> and then, the call stack:
> org.apache.spark.SparkException: Job aborted due to stage failure: Total size 
> of serialized results of 267 tasks (1024.1 MB) is bigger than 
> spark.driver.maxResultSize (1024.0 MB)
> at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
> at scala.Option.foreach(Option.scala:236)
> at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> at 
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942)
> at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003)
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
> at org.apache.spark.rdd.RDD.reduce(RDD.scala:985)
> at 
> org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:86)
> at 
> org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:37)
> at 
> org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:286)
> at 
> org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:66)
> at 
> org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:141)
> This occurs because it uses a DenseVector and spark seemingly aggregate all 
> of them on the driver before reducing.  
> I think this could be easily prevented with a treeReduce(_ += 

[jira] [Commented] (MAHOUT-1790) SparkEngine nnz overflow resultSize when reducing.

2016-03-14 Thread Suneel Marthi (JIRA)

[ 
https://issues.apache.org/jira/browse/MAHOUT-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15194674#comment-15194674
 ] 

Suneel Marthi commented on MAHOUT-1790:
---

any updates  on this?

> SparkEngine nnz overflow resultSize when reducing.
> --
>
> Key: MAHOUT-1790
> URL: https://issues.apache.org/jira/browse/MAHOUT-1790
> Project: Mahout
>  Issue Type: Bug
>  Components: spark
>Affects Versions: 0.11.1
>Reporter: Michel Lemay
>Assignee: Dmitriy Lyubimov
>Priority: Minor
> Fix For: 0.12.0
>
>
> When counting numNonZeroElementsPerColumn in spark engine with large number 
> of columns, we get the following error:
> ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7 
> MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
> and then, the call stack:
> org.apache.spark.SparkException: Job aborted due to stage failure: Total size 
> of serialized results of 267 tasks (1024.1 MB) is bigger than 
> spark.driver.maxResultSize (1024.0 MB)
> at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
> at scala.Option.foreach(Option.scala:236)
> at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> at 
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942)
> at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003)
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
> at org.apache.spark.rdd.RDD.reduce(RDD.scala:985)
> at 
> org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:86)
> at 
> org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:37)
> at 
> org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:286)
> at 
> org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:66)
> at 
> org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:141)
> This occurs because it uses a DenseVector and spark seemingly aggregate all 
> of them on the driver before reducing.  
> I think this could be easily prevented with a treeReduce(_ += _, depth)  
> instead of a reduce(_ += _)
> 'depth' could be computed in function of 'n' and numberOfPartitions.. 
> something in the line of:
>   val maxResultSize = 
>   val numPartitions = drm.rdd.partitions.size
>   val n = drm.ncol
>   val bytesPerVector = n * 8 + overhead?
>   val maxVectors = maxResultSize / bytes / 2 + 1 // be safe
>   val depth = math.max(1, math.ceil(math.log(1 + numPartitions / maxVectors) 
> / math.log(2)).toInt)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (MAHOUT-1790) SparkEngine nnz overflow resultSize when reducing.

2015-11-14 Thread Dmitriy Lyubimov (JIRA)

[ 
https://issues.apache.org/jira/browse/MAHOUT-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15005677#comment-15005677
 ] 

Dmitriy Lyubimov commented on MAHOUT-1790:
--

+1. treeReduce() is a new api appeared in spark 1.3. The only complaint about 
this api i heard was "why they didn't do it in the first place". And they may 
have taken care of the optimization of the depth too. Semantically equivalent 
to reduce().

which means this fix is applicable to Mahout 0.11.x + branches. 

[~FlamingMike] in the spirit of the Apache "power of do", can you try to 
suggest the patch?

> SparkEngine nnz overflow resultSize when reducing.
> --
>
> Key: MAHOUT-1790
> URL: https://issues.apache.org/jira/browse/MAHOUT-1790
> Project: Mahout
>  Issue Type: Bug
>  Components: spark
>Affects Versions: 0.11.1
>Reporter: Michel Lemay
>Priority: Minor
>
> When counting numNonZeroElementsPerColumn in spark engine with large number 
> of columns, we get the following error:
> ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7 
> MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
> and then, the call stack:
> org.apache.spark.SparkException: Job aborted due to stage failure: Total size 
> of serialized results of 267 tasks (1024.1 MB) is bigger than 
> spark.driver.maxResultSize (1024.0 MB)
> at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
> at scala.Option.foreach(Option.scala:236)
> at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> at 
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942)
> at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003)
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
> at org.apache.spark.rdd.RDD.reduce(RDD.scala:985)
> at 
> org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:86)
> at 
> org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:37)
> at 
> org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:286)
> at 
> org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:66)
> at 
> org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:141)
> This occurs because it uses a DenseVector and spark seemingly aggregate all 
> of them on the driver before reducing.  
> I think this could be easily prevented with a treeReduce(_ += _, depth)  
> instead of a reduce(_ += _)
> 'depth' could be computed in function of 'n' and numberOfPartitions.. 
> something in the line of:
>   val maxResultSize = 
>   val numPartitions = drm.rdd.partitions.size
>   val n = drm.ncol
>   val bytesPerVector = n * 8 + overhead?
>   val maxVectors = maxResultSize / bytes / 2 + 1 // be safe
>   val depth = math.max(1, math.ceil(math.log(1 + numPartitions / maxVectors) 
> / math.log(2)).toInt)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)