Hi all, I'm using Spark to process some corpora and I need to count the occurrence of each 2-gram. I started with counting tuples (wordID1, wordID2) and it worked fine except the large memory usage and gc overhead due to the substantial number of small tuple objects. Then I tried to pack a pair of Int into a Long, and the gc overhead did reduce greatly, but the run time also increased several times.
I ran some small experiments with random data on different distributions. It seems that the performance issue only occurs on exponential distributed data. The example code is attached. The job is split into two stages, flatMap() and count(). When counting Tuples, flatMap() takes about 6s and count() takes about 2s, while when counting Longs, flatMap() takes 18s and count() takes 10s. I haven't look into Spark's implementation of flatMap/reduceByKey, but I guess Spark has some specializations for Long keys which happen to perform not very well on some specific distributions. Does anyone have ideas about this? Best wishes, Richard // lines of word IDs val data = (1 to 5000).par.map({ _ => (1 to 1000) map { _ => (-1000 * Math.log(Random.nextDouble)).toInt } }).seq // count Tuples, fast sc parallelize(data) flatMap { line => val first = line.iterator val second = line.iterator.drop(1) for (pair <- first zip(second)) yield (pair, 1L) } reduceByKey { _ + _ } count() // count Long, slow sc parallelize(data) flatMap { line => val first = line.iterator val second = line.iterator.drop(1) for ((a, b) <- first zip(second)) yield ((a.toLong << 32) | b, 1L) } reduceByKey { _ + _ } count()