[ 
https://issues.apache.org/jira/browse/SPARK-8708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14608214#comment-14608214
 ] 

Antony Mayi commented on SPARK-8708:
------------------------------------

ok, more detailed example showing there really are 5 partitions used in this 
case but eventually the .predictAll() pushes everything to just one. This is 
exactly what I am seeing in production - out of 500 partitions single one gets 
all the millions of predictions in it, all other partitions are empty.

{code}
>>> from operator import itemgetter
>>> from pyspark.mllib.recommendation import ALS
>>> from pyspark import SparkConf
>>> sconf = SparkConf()
>>> sconf.get('spark.default.parallelism')
u'5'
>>> r1 = (1, 1, 1.0)
>>> r2 = (1, 2, 2.0)
>>> r3 = (2, 1, 2.0)
>>> r4 = (2, 2, 2.0)
>>> r5 = (3, 1, 1.0)
>>> ratings = sc.parallelize([r1, r2, r3, r4, r5], 5)
>>> ratings.glom().map(len).collect()
[1, 1, 1, 1, 1]
>>> users = ratings.map(itemgetter(0)).distinct()
>>> users.glom().map(len).collect()
[0, 1, 1, 1, 0]
>>> model = ALS.trainImplicit(ratings, 1, seed=10)
>>> predictions_for_2 = model.predictAll(users.map(lambda u: (u, 2)))
>>> predictions_for_2.glom().map(len).collect()
[0, 0, 3, 0, 0]
{code}

> MatrixFactorizationModel.predictAll() populates single partition only
> ---------------------------------------------------------------------
>
>                 Key: SPARK-8708
>                 URL: https://issues.apache.org/jira/browse/SPARK-8708
>             Project: Spark
>          Issue Type: Bug
>          Components: MLlib
>    Affects Versions: 1.3.0
>            Reporter: Antony Mayi
>
> When using mllib.recommendation.ALS the RDD returned by .predictAll() has all 
> values pushed into single partition despite using quite high parallelism.
> This degrades performance of further processing (I can obviously run 
> .partitionBy()) to balance it but that's still too costly (ie if running 
> .predictAll() in loop for thousands of products) and should be possible to do 
> it rather somehow on the model (automatically)).
> Bellow is an example on tiny sample (same on large dataset):
> {code:title=pyspark}
> >>> r1 = (1, 1, 1.0)
> >>> r2 = (1, 2, 2.0)
> >>> r3 = (2, 1, 2.0)
> >>> r4 = (2, 2, 2.0)
> >>> r5 = (3, 1, 1.0)
> >>> ratings = sc.parallelize([r1, r2, r3, r4, r5], 5)
> >>> ratings.getNumPartitions()
> 5
> >>> users = ratings.map(itemgetter(0)).distinct()
> >>> model = ALS.trainImplicit(ratings, 1, seed=10)
> >>> predictions_for_2 = model.predictAll(users.map(lambda u: (u, 2)))
> >>> predictions_for_2.glom().map(len).collect()
> [0, 0, 3, 0, 0]
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to