Hi, I am facing an issue while deduplicating the keys in RDD (Code Snippet below). I have few Sequence Files, some of them have duplicate entries. I am trying to drop duplicate values for each key.
Here are two methods with code snippets: val path = "path/to/sequence/file" val rdd1 = ctx.sequenceFile(path, classOf[Text], classOf[Content]) val rdd1Copy = ctx.sequenceFile(path, classOf[Text], classOf[Content]) // Duplicates for the sake of testing val rdds = Array(rdd1, rdd1Copy) val rdd = ctx.union(rdds) // club all parts with duplicates //Method 1 : group by key followed by a map to pick unique value in each group rdd.groupByKey() .map(rec => (rec._1, rec._2.iterator.next())) .saveAsHadoopFile(output+"-method1", classOf[Text], classOf[Content], classOf[SequenceFileOutputFormat[Text,Content]]) // save it //Method 2 : reduce by key and drop the duplicate values rdd.reduceByKey((v1, v2) => v1) .saveAsHadoopFile(output+"-method2", classOf[Text], classOf[Content], classOf[SequenceFileOutputFormat[Text,Content]]) // save it The method 1 works fine but looks like groupBy is expensive. Method 2 is more interesting but it is not working as expected. The issue is: (1) it is removing lot more entries than it is supposed to remove and also (2) introducing duplicates. In my test, I had 47000 unique records (duplicated by the union for test), method 1 got it right and method 2 got only 1790 records out of which only 30 are unique. The number 30 was also the number of stages the job took to run. Can you please help me understand why the Method 2 is not the right way to remove duplicate values or what is wrong with the code snippet? p.s. Got Method2 from an accepted answer in StackOverflow: http://stackoverflow.com/a/31656056/1506477 - Thanks and regards Thamme -- *Thamme Gowda N. * Grad Student at usc.edu Twitter: @thammegowda <https://twitter.com/thammegowda> Website: http://scf.usc.edu/~tnarayan/