Hi,

I am facing an issue while deduplicating the keys in RDD (Code Snippet
below).
I have few Sequence Files, some of them have duplicate entries. I am trying
to drop duplicate values for each key.

Here are two methods with code snippets:

val path = "path/to/sequence/file"
val rdd1 = ctx.sequenceFile(path, classOf[Text], classOf[Content])
val rdd1Copy = ctx.sequenceFile(path, classOf[Text], classOf[Content])
// Duplicates for the sake of testing
val rdds = Array(rdd1, rdd1Copy)
val rdd = ctx.union(rdds) // club all parts with duplicates

//Method 1 : group by key followed by a map to pick unique value in each group
rdd.groupByKey()
  .map(rec => (rec._1, rec._2.iterator.next()))
  .saveAsHadoopFile(output+"-method1", classOf[Text],
    classOf[Content], classOf[SequenceFileOutputFormat[Text,Content]])
// save it

//Method 2 : reduce by key and drop the duplicate values
rdd.reduceByKey((v1, v2) => v1)
  .saveAsHadoopFile(output+"-method2", classOf[Text],
    classOf[Content], classOf[SequenceFileOutputFormat[Text,Content]])
// save it


The method 1 works fine but looks like groupBy is expensive.

 Method 2 is more interesting but it is not working as expected.
The issue is: (1) it is removing lot more entries than it is supposed to
remove and also (2) introducing duplicates.

In my test, I had 47000 unique records (duplicated by the union for test),
method 1 got it right  and method 2 got only 1790 records out of which only
30 are unique.
 The number 30 was also the number of stages the job took to run.

Can you please help me understand why the Method 2 is not the right way to
remove duplicate values or what is wrong with the code snippet?

p.s.
    Got Method2 from an accepted answer in StackOverflow:
http://stackoverflow.com/a/31656056/1506477

-
Thanks and regards
Thamme


--
*Thamme Gowda N. *
Grad Student at usc.edu
Twitter: @thammegowda  <https://twitter.com/thammegowda>
Website: http://scf.usc.edu/~tnarayan/

Reply via email to