[ https://issues.apache.org/jira/browse/SPARK-6792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Charles Hayden updated SPARK-6792: ---------------------------------- Description: Under some circumstances, pySpark groupByKey returns two or more rows with the same groupby key. It is not reproducible by a short example, but it can be seen in the following program. The preservesPartitioning argument is required to see the failure. I ran this with cluster_url=local[4], but I think it will also show up with cluster_url=local. ===================================================== {quote} # The RDD.groupByKey sometimes gives two results with the same key value. This is incorrect: all results with a single key need to be grouped together. # Report the spark version from pyspark import SparkContext import StringIO import csv sc = SparkContext() print sc.version def loadRecord(line): input = StringIO.StringIO(line) reader = csv.reader(input, delimiter='\t') return reader.next() # Read data from movielens dataset # This can be obtained from http://files.grouplens.org/datasets/movielens/ml-100k.zip inputFile = 'u.data' input = sc.textFile(inputFile) data = input.map(loadRecord) # Trim off unneeded fields data = data.map(lambda row: row[0:2]) print 'Data Sample' print data.take(10) # Use join to filter the data # # map bulds left key # map builds right key # join # map throws away the key and gets result # pick a couple of users j = sc.parallelize([789, 939]) # key left # conversion to str is required to show the error keyed_j = j.map(lambda row: (str(row), None)) # key right keyed_rdd = data.map(lambda row: (str(row[0]), row)) # join joined = keyed_rdd.join(keyed_j) # throw away key # preservesPartitioning is required to show the error res = joined.map(lambda row: row[1][0], preservesPartitioning=True) #res = joined.map(lambda row: row[1][0]) # no error print 'Filtered Sample' print res.take(10) #print res.count() # Do the groupby # There should be fewer rows keyed_rdd = res.map(lambda row: (row[1], row), preservesPartitioning=True) print 'Input Count', keyed_rdd.count() grouped_rdd = keyed_rdd.groupByKey() print 'Grouped Count', grouped_rdd.count() # There are two rows with the same key ! print 'Group Output Sample' print grouped_rdd.filter(lambda row: row[0] == '508').take(10) {quote} was: Under some circumstances, pySpark groupByKey returns two or more rows with the same groupby key. It is not reproducible by a short example, but it can be seen in the following program. The preservesPartitioning argument is required to see the failure. I ran this with cluster_url=local[4], but I think it will also show up with cluster_url=local. ===================================================== # The RDD.groupByKey sometimes gives two results with the same key value. This is incorrect: all results with a single key need to be grouped together. # Report the spark version from pyspark import SparkContext import StringIO import csv sc = SparkContext() print sc.version def loadRecord(line): input = StringIO.StringIO(line) reader = csv.reader(input, delimiter='\t') return reader.next() # Read data from movielens dataset # This can be obtained from http://files.grouplens.org/datasets/movielens/ml-100k.zip inputFile = 'u.data' input = sc.textFile(inputFile) data = input.map(loadRecord) # Trim off unneeded fields data = data.map(lambda row: row[0:2]) print 'Data Sample' print data.take(10) # Use join to filter the data # # map bulds left key # map builds right key # join # map throws away the key and gets result # pick a couple of users j = sc.parallelize([789, 939]) # key left # conversion to str is required to show the error keyed_j = j.map(lambda row: (str(row), None)) # key right keyed_rdd = data.map(lambda row: (str(row[0]), row)) # join joined = keyed_rdd.join(keyed_j) # throw away key # preservesPartitioning is required to show the error res = joined.map(lambda row: row[1][0], preservesPartitioning=True) #res = joined.map(lambda row: row[1][0]) # no error print 'Filtered Sample' print res.take(10) #print res.count() # Do the groupby # There should be fewer rows keyed_rdd = res.map(lambda row: (row[1], row), preservesPartitioning=True) print 'Input Count', keyed_rdd.count() grouped_rdd = keyed_rdd.groupByKey() print 'Grouped Count', grouped_rdd.count() # There are two rows with the same key ! print 'Group Output Sample' print grouped_rdd.filter(lambda row: row[0] == '508').take(10) > pySpark groupByKey returns rows with the same key > ------------------------------------------------- > > Key: SPARK-6792 > URL: https://issues.apache.org/jira/browse/SPARK-6792 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 1.3.0 > Reporter: Charles Hayden > > Under some circumstances, pySpark groupByKey returns two or more rows with > the same groupby key. > It is not reproducible by a short example, but it can be seen in the > following program. > The preservesPartitioning argument is required to see the failure. > I ran this with cluster_url=local[4], but I think it will also show up with > cluster_url=local. > ===================================================== > {quote} > # The RDD.groupByKey sometimes gives two results with the same key > value. This is incorrect: all results with a single key need to be grouped > together. > # Report the spark version > from pyspark import SparkContext > import StringIO > import csv > sc = SparkContext() > print sc.version > def loadRecord(line): > input = StringIO.StringIO(line) > reader = csv.reader(input, delimiter='\t') > return reader.next() > # Read data from movielens dataset > # This can be obtained from > http://files.grouplens.org/datasets/movielens/ml-100k.zip > inputFile = 'u.data' > input = sc.textFile(inputFile) > data = input.map(loadRecord) > # Trim off unneeded fields > data = data.map(lambda row: row[0:2]) > print 'Data Sample' > print data.take(10) > # Use join to filter the data > # > # map bulds left key > # map builds right key > # join > # map throws away the key and gets result > # pick a couple of users > j = sc.parallelize([789, 939]) > # key left > # conversion to str is required to show the error > keyed_j = j.map(lambda row: (str(row), None)) > # key right > keyed_rdd = data.map(lambda row: (str(row[0]), row)) > # join > joined = keyed_rdd.join(keyed_j) > # throw away key > # preservesPartitioning is required to show the error > res = joined.map(lambda row: row[1][0], preservesPartitioning=True) > #res = joined.map(lambda row: row[1][0]) # no error > print 'Filtered Sample' > print res.take(10) > #print res.count() > # Do the groupby > # There should be fewer rows > keyed_rdd = res.map(lambda row: (row[1], row), > preservesPartitioning=True) > print 'Input Count', keyed_rdd.count() > grouped_rdd = keyed_rdd.groupByKey() > print 'Grouped Count', grouped_rdd.count() > # There are two rows with the same key ! > print 'Group Output Sample' > print grouped_rdd.filter(lambda row: row[0] == '508').take(10) > {quote} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org