[ https://issues.apache.org/jira/browse/SPARK-8708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14608214#comment-14608214 ]
Antony Mayi commented on SPARK-8708: ------------------------------------ ok, more detailed example showing there really are 5 partitions used in this case but eventually the .predictAll() pushes everything to just one. This is exactly what I am seeing in production - out of 500 partitions single one gets all the millions of predictions in it, all other partitions are empty. {code} >>> from operator import itemgetter >>> from pyspark.mllib.recommendation import ALS >>> from pyspark import SparkConf >>> sconf = SparkConf() >>> sconf.get('spark.default.parallelism') u'5' >>> r1 = (1, 1, 1.0) >>> r2 = (1, 2, 2.0) >>> r3 = (2, 1, 2.0) >>> r4 = (2, 2, 2.0) >>> r5 = (3, 1, 1.0) >>> ratings = sc.parallelize([r1, r2, r3, r4, r5], 5) >>> ratings.glom().map(len).collect() [1, 1, 1, 1, 1] >>> users = ratings.map(itemgetter(0)).distinct() >>> users.glom().map(len).collect() [0, 1, 1, 1, 0] >>> model = ALS.trainImplicit(ratings, 1, seed=10) >>> predictions_for_2 = model.predictAll(users.map(lambda u: (u, 2))) >>> predictions_for_2.glom().map(len).collect() [0, 0, 3, 0, 0] {code} > MatrixFactorizationModel.predictAll() populates single partition only > --------------------------------------------------------------------- > > Key: SPARK-8708 > URL: https://issues.apache.org/jira/browse/SPARK-8708 > Project: Spark > Issue Type: Bug > Components: MLlib > Affects Versions: 1.3.0 > Reporter: Antony Mayi > > When using mllib.recommendation.ALS the RDD returned by .predictAll() has all > values pushed into single partition despite using quite high parallelism. > This degrades performance of further processing (I can obviously run > .partitionBy()) to balance it but that's still too costly (ie if running > .predictAll() in loop for thousands of products) and should be possible to do > it rather somehow on the model (automatically)). > Bellow is an example on tiny sample (same on large dataset): > {code:title=pyspark} > >>> r1 = (1, 1, 1.0) > >>> r2 = (1, 2, 2.0) > >>> r3 = (2, 1, 2.0) > >>> r4 = (2, 2, 2.0) > >>> r5 = (3, 1, 1.0) > >>> ratings = sc.parallelize([r1, r2, r3, r4, r5], 5) > >>> ratings.getNumPartitions() > 5 > >>> users = ratings.map(itemgetter(0)).distinct() > >>> model = ALS.trainImplicit(ratings, 1, seed=10) > >>> predictions_for_2 = model.predictAll(users.map(lambda u: (u, 2))) > >>> predictions_for_2.glom().map(len).collect() > [0, 0, 3, 0, 0] > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org