
This is weird. The code of foreachPartition
to ParallelCollectionRDD
ends in slice
where the most important part is the *positions* method:

 def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
     (0 until numSlices).iterator.map { i =>
        val start = ((i * length) / numSlices).toInt
        val end = (((i + 1) * length) / numSlices).toInt
        (start, end)

Because of the extra ' (' you used in "*parallelize( (Array*" I thought
some scala implicit might generate a Seq with one Array in it.
But in that case your output would contain an Array. So this must be not
the case.

1) What Spark/Scala version you are using? on what OS?

2)  Can you reproduce this issue in the spark-shell?

scala> case class Animal(id:Int, name:String)
defined class Animal

scala> val myRDD = sc.parallelize( (Array( Animal(1, "Lion"),
Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"
Tiger"), Animal(5, "Chetah") ) ), 12)
myRDD: org.apache.spark.rdd.RDD[Animal] = ParallelCollectionRDD[2] at
parallelize at <console>:27

scala> myRDD.foreachPartition( e => { println("----------");
e.foreach(println) } )

scala> Console println myRDD.getNumPartitions

3) Can you please check spark-shell what happens when you paste the above
method and call it like:

scala> def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
     |   (0 until numSlices).iterator.map { i =>
     |     val start = ((i * length) / numSlices).toInt
     |       val end = (((i + 1) * length) / numSlices).toInt
     |       (start, end)
     |   }
     | }
positions: (length: Long, numSlices: Int)Iterator[(Int, Int)]

scala> positions(5, 12).foreach(println)

As you can see in my case the `positions` result consistent with the
and this should be deterministic.

Best regards,

On Tue, Mar 16, 2021 at 5:34 AM Renganathan Mutthiah <
renganatha...@gmail.com> wrote:

> Hi,
> I have a question with respect to default partitioning in RDD.
> *case class Animal(id:Int, name:String)   val myRDD =
> session.sparkContext.parallelize( (Array( Animal(1, "Lion"),
> Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"Tiger"), Animal(5,
> "Chetah") ) ))Console println myRDD.getNumPartitions  *
> I am running the above piece of code in my laptop which has 12 logical
> cores.
> Hence I see that there are 12 partitions created.
> My understanding is that hash partitioning is used to determine which
> object needs to go to which partition. So in this case, the formula would
> be: hashCode() % 12
> But when I further examine, I see all the RDDs are put in the last
> partition.
> *myRDD.foreachPartition( e => { println("----------"); e.foreach(println)
> } )*
> Above code prints the below(first eleven partitions are empty and the last
> one has all the objects. The line is separate the partition contents):
> ----------
> ----------
> ----------
> ----------
> ----------
> ----------
> ----------
> ----------
> ----------
> ----------
> ----------
> ----------
> Animal(2,Elephant)
> Animal(4,Tiger)
> Animal(3,Jaguar)
> Animal(5,Chetah)
> Animal(1,Lion)
> I don't know why this happens. Can you please help.
> Thanks!

Reply via email to