hi folks,

I was debugging a Spark job that ending up with too few partitions during
the join step and thought I'd reach out understand if this is the right
behavior / what typical workarounds are.

I have two RDDs that I'm joining. One with a lot of partitions (5K+) and
one with much lesser partitions (< 50). I perform a reduceByKey on the
smallerRDD and then join the two together. I notice that the join
operations ends up with numPartitions = smallerRDD.numPartitions. This
seems to stem from the code in Partitioner.defaultPartitioner
<https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Partitioner.scala#L57>.
That code checks if either of the RDDs has a partitioner specified and if
it does, it picks the partitioner and numPartitions of that RDD. In my case
as I'm calling reduceByKey on the smaller RDD, that ends up with a
partitioner being set and thus that's what we end up with along with the
much fewer number of partitions.

I'm currently just specifying the number of partitions I want, but I was
wondering if others have run into this and if there are other suggested
workarounds? To partition my larger RDD as well? Would it make sense in the
defaultPartitioner function to account for if the number of partitions is
much larger in one RDD?

Here's a simple snippet that illustrates things:

val largeRDD = sc.parallelize( List( (1,10), (1,11), (2,20), (2,21),
(3, 30), (3,31)), 100)val smallRDD = sc.parallelize( List( (1,"one"),
(2,"two"), (3,"three")), 2).reduceByKey((l, _) => l)
// end up with a join with 2 partitions
largeRDD.join(smallRDD).collect().foreach(println)


Thanks,

-- 
- Piyush

Reply via email to