Hi! This is weird. The code of foreachPartition <https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L801-L806> leads to ParallelCollectionRDD <https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala#L84-L107> which ends in slice <https://github.com/apache/spark/blob/f643bd96593dc411cb0cca1c7a3f28f93765c9b6/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala#L116-L155>, where the most important part is the *positions* method:
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = { (0 until numSlices).iterator.map { i => val start = ((i * length) / numSlices).toInt val end = (((i + 1) * length) / numSlices).toInt (start, end) } } Because of the extra ' (' you used in "*parallelize( (Array*" I thought some scala implicit might generate a Seq with one Array in it. But in that case your output would contain an Array. So this must be not the case. 1) What Spark/Scala version you are using? on what OS? 2) Can you reproduce this issue in the spark-shell? scala> case class Animal(id:Int, name:String) defined class Animal scala> val myRDD = sc.parallelize( (Array( Animal(1, "Lion"), Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4," Tiger"), Animal(5, "Chetah") ) ), 12) myRDD: org.apache.spark.rdd.RDD[Animal] = ParallelCollectionRDD[2] at parallelize at <console>:27 scala> myRDD.foreachPartition( e => { println("----------"); e.foreach(println) } ) ---------- ---------- ---------- Animal(1,Lion) ---------- ---------- Animal(2,Elephant) ---------- ---------- ---------- Animal(3,Jaguar) ---------- ---------- Animal(4,Tiger) ---------- ---------- Animal(5,Chetah) scala> Console println myRDD.getNumPartitions 12 3) Can you please check spark-shell what happens when you paste the above method and call it like: scala> def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = { | (0 until numSlices).iterator.map { i => | val start = ((i * length) / numSlices).toInt | val end = (((i + 1) * length) / numSlices).toInt | (start, end) | } | } positions: (length: Long, numSlices: Int)Iterator[(Int, Int)] scala> positions(5, 12).foreach(println) (0,0) (0,0) (0,1) (1,1) (1,2) (2,2) (2,2) (2,3) (3,3) (3,4) (4,4) (4,5) As you can see in my case the `positions` result consistent with the `foreachPartition` and this should be deterministic. Best regards, Attila On Tue, Mar 16, 2021 at 5:34 AM Renganathan Mutthiah < renganatha...@gmail.com> wrote: > Hi, > > I have a question with respect to default partitioning in RDD. > > > > > *case class Animal(id:Int, name:String) val myRDD = > session.sparkContext.parallelize( (Array( Animal(1, "Lion"), > Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"Tiger"), Animal(5, > "Chetah") ) ))Console println myRDD.getNumPartitions * > > I am running the above piece of code in my laptop which has 12 logical > cores. > Hence I see that there are 12 partitions created. > > My understanding is that hash partitioning is used to determine which > object needs to go to which partition. So in this case, the formula would > be: hashCode() % 12 > But when I further examine, I see all the RDDs are put in the last > partition. > > *myRDD.foreachPartition( e => { println("----------"); e.foreach(println) > } )* > > Above code prints the below(first eleven partitions are empty and the last > one has all the objects. The line is separate the partition contents): > ---------- > ---------- > ---------- > ---------- > ---------- > ---------- > ---------- > ---------- > ---------- > ---------- > ---------- > ---------- > Animal(2,Elephant) > Animal(4,Tiger) > Animal(3,Jaguar) > Animal(5,Chetah) > Animal(1,Lion) > > I don't know why this happens. Can you please help. > > Thanks! >