[jira] [Commented] (MAHOUT-1817) Implement caching in Flink Bindings

2016-03-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MAHOUT-1817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15212404#comment-15212404
 ] 

ASF GitHub Bot commented on MAHOUT-1817:


GitHub user andrewpalumbo opened a pull request:

https://github.com/apache/mahout/pull/203

MAHOUT-1817  Implement caching in Flink Bindings



As a temporary measure, use this method to persist the `DataSet` to the 
filesystem when caching rather that drmDfsRead()/Write.

Todo:

1. Break up into `persist` and `readPersistedDataset` methods and only read 
a persisted dataset if it is already cached.
2. Use a property setting for the base dir.
3. Check to make sure that this method maintains parallelism deg for 
the dataset, if not set the new parallelism degree to the original



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/andrewpalumbo/mahout MAHOUT-1817

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/mahout/pull/203.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #203


commit a1cf7cf56e036ce12d616f8aea0af1e9dcdf2cb6
Author: Andrew Palumbo 
Date:   2016-03-25T20:56:20Z

Small change addressing DL's comment on apache/mahout#200, also a small fix




> Implement caching in Flink Bindings
> ---
>
> Key: MAHOUT-1817
> URL: https://issues.apache.org/jira/browse/MAHOUT-1817
> Project: Mahout
>  Issue Type: New Feature
>  Components: Flink
>Affects Versions: 0.11.2
>Reporter: Andrew Palumbo
>Assignee: Andrew Palumbo
>Priority: Blocker
> Fix For: 0.12.0
>
>
> Flink does not have in-memory caching analogous to that of Spark.  We need 
> find a way to honour the {{checkpoint()}} contract in Flink Bindings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Issue Comment Deleted] (MAHOUT-1790) SparkEngine nnz overflow resultSize when reducing.

2016-03-25 Thread Suneel Marthi (JIRA)

 [ 
https://issues.apache.org/jira/browse/MAHOUT-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Suneel Marthi updated MAHOUT-1790:
--
Comment: was deleted

(was: any updates  on this?)

> SparkEngine nnz overflow resultSize when reducing.
> --
>
> Key: MAHOUT-1790
> URL: https://issues.apache.org/jira/browse/MAHOUT-1790
> Project: Mahout
>  Issue Type: Bug
>  Components: spark
>Affects Versions: 0.11.1
>Reporter: Michel Lemay
>Assignee: Dmitriy Lyubimov
>Priority: Minor
> Fix For: 0.12.0
>
>
> When counting numNonZeroElementsPerColumn in spark engine with large number 
> of columns, we get the following error:
> ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7 
> MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
> and then, the call stack:
> org.apache.spark.SparkException: Job aborted due to stage failure: Total size 
> of serialized results of 267 tasks (1024.1 MB) is bigger than 
> spark.driver.maxResultSize (1024.0 MB)
> at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
> at scala.Option.foreach(Option.scala:236)
> at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> at 
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942)
> at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003)
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
> at org.apache.spark.rdd.RDD.reduce(RDD.scala:985)
> at 
> org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:86)
> at 
> org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:37)
> at 
> org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:286)
> at 
> org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:66)
> at 
> org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:141)
> This occurs because it uses a DenseVector and spark seemingly aggregate all 
> of them on the driver before reducing.  
> I think this could be easily prevented with a treeReduce(_ += _, depth)  
> instead of a reduce(_ += _)
> 'depth' could be computed in function of 'n' and numberOfPartitions.. 
> something in the line of:
>   val maxResultSize = 
>   val numPartitions = drm.rdd.partitions.size
>   val n = drm.ncol
>   val bytesPerVector = n * 8 + overhead?
>   val maxVectors = maxResultSize / bytes / 2 + 1 // be safe
>   val depth = math.max(1, math.ceil(math.log(1 + numPartitions / maxVectors) 
> / math.log(2)).toInt)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Mahout 0.12.0 pending issues

2016-03-25 Thread Dmitriy Lyubimov
i added a comment to 1790. As posted, the issue is just a matter of spark
configuration.

On Fri, Mar 25, 2016 at 9:44 AM, Andrew Palumbo  wrote:

> Thx.
>
>  Original message 
> From: Suneel Marthi 
> Date: 03/25/2016 12:36 AM (GMT-05:00)
> To: mahout 
> Subject: Mahout 0.12.0 pending issues
>
> As we r in the final stretch of 0.12.0 release (big feature being support
> for Flink as backend), we'll be going thru a daily bug report to track
> progress as has been the practice preceding each major release.
>
> Palumbo
>
> M-1750: implement ABt
> M-1818: dals test fails for flink bindings
> M-1817: implement caching for flink
>
> Musselman
>
> M-1766
> M-1634
> M-1686
>
> Dmitriy
>
> M-1790: SparkEngine nnz overflow result size when reducing
>
> Marthi
>
> M-1814: implement drm2IntKeyed for flink bindings
> M-1749: implement Atx
> M-1711: implement broadcasting
> M-1779: add wiki page for flink backend
> M-1765: add documentation for flink backend
>


[jira] [Comment Edited] (MAHOUT-1790) SparkEngine nnz overflow resultSize when reducing.

2016-03-25 Thread Dmitriy Lyubimov (JIRA)

[ 
https://issues.apache.org/jira/browse/MAHOUT-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15212117#comment-15212117
 ] 

Dmitriy Lyubimov edited comment on MAHOUT-1790 at 3/25/16 5:25 PM:
---

[~smarthi]
(1) Actually i don't know why this comment (of mine) got in here. This is a 
different issue altogether -- it is an issue of a spark setting. if there's a 
collection of large volume of information to the front end, spark has a setting 
to limit it at certain number by default (1Gb), apparently in attempt to 
maintain the robustness of the backend w.r.t. runaway processes and ill 
programming. If there's a legitimate big collection to the front that exceeds 
1G then the solution is just bump up this setting with spark. 
(-Dspark.max.result or something).

(2) I still have a question why would the operation invoked cause such a big 
collection to the front -- this needs to be investigated. However, this 
capability, numNonZero etc. was added by either [~ssc] or [~pferrel], so i'd 
suggest for these folks to take a look at it again. I don't have the first hand 
knowledge of this logic. 

If everything else fails, i'll take a look at some point -- but not soon, as it 
is not a priority for me.







was (Author: dlyubimov):
[~smarthi]
(1) Actually i don't know why this comment (of mine) got in here. This is a 
different issue altogether -- it is an issue of a spark setting. if there's a 
collection of large volume of information to the front end, spark has a setting 
to limit it at certain number by default (1Gb), apparently in attempt to 
maintain the robustness of the backend w.r.t. runaway processes and ill 
programming. If there's a legitimate big collection to the front that exceeds 
1G then the solution is just bump up this setting with spark. 
(-Dspark.max.result or something).

(2) I still have a question why would the operation invoked cause such a big 
collection to the front -- this needs to be investigated. However, this 
capability, numNonZero etc. was added by either [~ssc] or [~pferrel], so i'd 
suggest for these folks to take a look at it again. I don't know first hand 
knowledge of this logic. 

If everything else fails, i'll take a look at some point -- but not soon, as it 
is not a priority for me.






> SparkEngine nnz overflow resultSize when reducing.
> --
>
> Key: MAHOUT-1790
> URL: https://issues.apache.org/jira/browse/MAHOUT-1790
> Project: Mahout
>  Issue Type: Bug
>  Components: spark
>Affects Versions: 0.11.1
>Reporter: Michel Lemay
>Assignee: Dmitriy Lyubimov
>Priority: Minor
> Fix For: 0.12.0
>
>
> When counting numNonZeroElementsPerColumn in spark engine with large number 
> of columns, we get the following error:
> ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7 
> MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
> and then, the call stack:
> org.apache.spark.SparkException: Job aborted due to stage failure: Total size 
> of serialized results of 267 tasks (1024.1 MB) is bigger than 
> spark.driver.maxResultSize (1024.0 MB)
> at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
> at scala.Option.foreach(Option.scala:236)
> at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> at 
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942)
> at 

[jira] [Commented] (MAHOUT-1790) SparkEngine nnz overflow resultSize when reducing.

2016-03-25 Thread Dmitriy Lyubimov (JIRA)

[ 
https://issues.apache.org/jira/browse/MAHOUT-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15212117#comment-15212117
 ] 

Dmitriy Lyubimov commented on MAHOUT-1790:
--

[~smarthi]
(1) Actually i don't know why this comment (of mine) got in here. This is a 
different issue altogether -- it is an issue of a spark setting. if there's a 
collection of large volume of information to the front end, spark has a setting 
to limit it at certain number by default (1Gb), apparently in attempt to 
maintain the robustness of the backend w.r.t. runaway processes and ill 
programming. If there's a legitimate big collection to the front that exceeds 
1G then the solution is just bump up this setting with spark. 
(-Dspark.max.result or something).

(2) I still have a question why would the operation invoked cause such a big 
collection to the front -- this needs to be investigated. However, this 
capability, numNonZero etc. was added by either [~ssc] or [~pferrel], so i'd 
suggest for these folks to take a look at it again. I don't know first hand 
knowledge of this logic. 

If everything else fails, i'll take a look at some point -- but not soon, as it 
is not a priority for me.






> SparkEngine nnz overflow resultSize when reducing.
> --
>
> Key: MAHOUT-1790
> URL: https://issues.apache.org/jira/browse/MAHOUT-1790
> Project: Mahout
>  Issue Type: Bug
>  Components: spark
>Affects Versions: 0.11.1
>Reporter: Michel Lemay
>Assignee: Dmitriy Lyubimov
>Priority: Minor
> Fix For: 0.12.0
>
>
> When counting numNonZeroElementsPerColumn in spark engine with large number 
> of columns, we get the following error:
> ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7 
> MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
> and then, the call stack:
> org.apache.spark.SparkException: Job aborted due to stage failure: Total size 
> of serialized results of 267 tasks (1024.1 MB) is bigger than 
> spark.driver.maxResultSize (1024.0 MB)
> at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
> at scala.Option.foreach(Option.scala:236)
> at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> at 
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942)
> at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003)
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
> at org.apache.spark.rdd.RDD.reduce(RDD.scala:985)
> at 
> org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:86)
> at 
> org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:37)
> at 
> org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:286)
> at 
> org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:66)
> at 
> org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:141)
> This occurs because it uses a DenseVector and spark seemingly aggregate all 
> of them on the driver before reducing.  
> I think this could be easily prevented with a treeReduce(_ +=