[ https://issues.apache.org/jira/browse/SPARK-6792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Patrick Wendell resolved SPARK-6792. ------------------------------------ Resolution: Not A Problem Resolving per Josh's comment. > pySpark groupByKey returns rows with the same key > ------------------------------------------------- > > Key: SPARK-6792 > URL: https://issues.apache.org/jira/browse/SPARK-6792 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 1.3.0 > Reporter: Charles Hayden > > Under some circumstances, pySpark groupByKey returns two or more rows with > the same groupby key. > It is not reproducible by a short example, but it can be seen in the > following program. > The preservesPartitioning argument is required to see the failure. > I ran this with cluster_url=local[4], but I think it will also show up with > cluster_url=local. > ===================================================== > {noformat} > # The RDD.groupByKey sometimes gives two results with the same key > value. This is incorrect: all results with a single key need to be grouped > together. > # Report the spark version > from pyspark import SparkContext > import StringIO > import csv > sc = SparkContext() > print sc.version > def loadRecord(line): > input = StringIO.StringIO(line) > reader = csv.reader(input, delimiter='\t') > return reader.next() > # Read data from movielens dataset > # This can be obtained from > http://files.grouplens.org/datasets/movielens/ml-100k.zip > inputFile = 'u.data' > input = sc.textFile(inputFile) > data = input.map(loadRecord) > # Trim off unneeded fields > data = data.map(lambda row: row[0:2]) > print 'Data Sample' > print data.take(10) > # Use join to filter the data > # > # map bulds left key > # map builds right key > # join > # map throws away the key and gets result > # pick a couple of users > j = sc.parallelize([789, 939]) > # key left > # conversion to str is required to show the error > keyed_j = j.map(lambda row: (str(row), None)) > # key right > keyed_rdd = data.map(lambda row: (str(row[0]), row)) > # join > joined = keyed_rdd.join(keyed_j) > # throw away key > # preservesPartitioning is required to show the error > res = joined.map(lambda row: row[1][0], preservesPartitioning=True) > #res = joined.map(lambda row: row[1][0]) # no error > print 'Filtered Sample' > print res.take(10) > #print res.count() > # Do the groupby > # There should be fewer rows > keyed_rdd = res.map(lambda row: (row[1], row), > preservesPartitioning=True) > print 'Input Count', keyed_rdd.count() > grouped_rdd = keyed_rdd.groupByKey() > print 'Grouped Count', grouped_rdd.count() > # There are two rows with the same key ! > print 'Group Output Sample' > print grouped_rdd.filter(lambda row: row[0] == '508').take(10) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org