Hi!

This is weird. The code of foreachPartition
<https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L801-L806>
leads
to ParallelCollectionRDD
<https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala#L84-L107>
which
ends in slice
<https://github.com/apache/spark/blob/f643bd96593dc411cb0cca1c7a3f28f93765c9b6/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala#L116-L155>,
where the most important part is the *positions* method:

 def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
     (0 until numSlices).iterator.map { i =>
        val start = ((i * length) / numSlices).toInt
        val end = (((i + 1) * length) / numSlices).toInt
        (start, end)
     }
 }

Because of the extra ' (' you used in "*parallelize( (Array*" I thought
some scala implicit might generate a Seq with one Array in it.
But in that case your output would contain an Array. So this must be not
the case.

1) What Spark/Scala version you are using? on what OS?

2)  Can you reproduce this issue in the spark-shell?

scala> case class Animal(id:Int, name:String)
defined class Animal

scala> val myRDD = sc.parallelize( (Array( Animal(1, "Lion"),
Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"
Tiger"), Animal(5, "Chetah") ) ), 12)
myRDD: org.apache.spark.rdd.RDD[Animal] = ParallelCollectionRDD[2] at
parallelize at <console>:27

scala> myRDD.foreachPartition( e => { println("----------");
e.foreach(println) } )
----------
----------
----------
Animal(1,Lion)
----------
----------
Animal(2,Elephant)
----------
----------
----------
Animal(3,Jaguar)
----------
----------
Animal(4,Tiger)
----------
----------
Animal(5,Chetah)

scala> Console println myRDD.getNumPartitions
12

3) Can you please check spark-shell what happens when you paste the above
method and call it like:

scala> def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
     |   (0 until numSlices).iterator.map { i =>
     |     val start = ((i * length) / numSlices).toInt
     |       val end = (((i + 1) * length) / numSlices).toInt
     |       (start, end)
     |   }
     | }
positions: (length: Long, numSlices: Int)Iterator[(Int, Int)]

scala> positions(5, 12).foreach(println)
(0,0)
(0,0)
(0,1)
(1,1)
(1,2)
(2,2)
(2,2)
(2,3)
(3,3)
(3,4)
(4,4)
(4,5)

As you can see in my case the `positions` result consistent with the
`foreachPartition`
and this should be deterministic.

Best regards,
Attila


On Tue, Mar 16, 2021 at 5:34 AM Renganathan Mutthiah <
renganatha...@gmail.com> wrote:

> Hi,
>
> I have a question with respect to default partitioning in RDD.
>
>
>
>
> *case class Animal(id:Int, name:String)   val myRDD =
> session.sparkContext.parallelize( (Array( Animal(1, "Lion"),
> Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"Tiger"), Animal(5,
> "Chetah") ) ))Console println myRDD.getNumPartitions  *
>
> I am running the above piece of code in my laptop which has 12 logical
> cores.
> Hence I see that there are 12 partitions created.
>
> My understanding is that hash partitioning is used to determine which
> object needs to go to which partition. So in this case, the formula would
> be: hashCode() % 12
> But when I further examine, I see all the RDDs are put in the last
> partition.
>
> *myRDD.foreachPartition( e => { println("----------"); e.foreach(println)
> } )*
>
> Above code prints the below(first eleven partitions are empty and the last
> one has all the objects. The line is separate the partition contents):
> ----------
> ----------
> ----------
> ----------
> ----------
> ----------
> ----------
> ----------
> ----------
> ----------
> ----------
> ----------
> Animal(2,Elephant)
> Animal(4,Tiger)
> Animal(3,Jaguar)
> Animal(5,Chetah)
> Animal(1,Lion)
>
> I don't know why this happens. Can you please help.
>
> Thanks!
>

Reply via email to