Oh, sure that was the reason. You can keep using the `foreachPartition` and
get the partition ID from the `TaskContext`:

scala> import org.apache.spark.TaskContext
import org.apache.spark.TaskContext

scala> myRDD.foreachPartition( e => {  println(TaskContext.getPartitionId +
":" + e.mkString(",")) } )
0:
1:
2:Animal(1,Lion)
3:
4:Animal(2,Elephant)
5:
6:
7:Animal(3,Jaguar)
8:
9:Animal(4,Tiger)
10:
11:Animal(5,Chetah)

scala>




On Tue, Mar 16, 2021 at 2:38 PM German Schiavon <gschiavonsp...@gmail.com>
wrote:

> Hi all,
>
> I guess you could do something like this too:
>
> [image: Captura de pantalla 2021-03-16 a las 14.35.46.png]
>
> On Tue, 16 Mar 2021 at 13:31, Renganathan Mutthiah <
> renganatha...@gmail.com> wrote:
>
>> Hi Attila,
>>
>> Thanks for looking into this!
>>
>> I actually found the issue and it turned out to be that the print
>> statements misled me. The records are indeed stored in different partitions.
>> What happened is since the foreachpartition method is run parallelly by
>> different threads, they all printed the first line almost at the same time
>> and followed by data which is also printed at almost the same time. This
>> has given an appearance that all the data is stored in a single partition.
>> When I run the below code, I can see that the objects are stored in
>> different partitions of course!
>>
>> *myRDD.mapPartitionsWithIndex( (index,itr) => { itr.foreach( e =>
>> println("Index : " +index +" " + e)); itr}, true).collect()*
>>
>> Prints the below... (index: ?  the ? is actually the partition number)
>> *Index : 9 Animal(4,Tiger) Index : 4 Animal(2,Elephant) Index : 11
>> Animal(5,Chetah) Index : 2 Animal(1,Lion) Index : 7 Animal(3,Jaguar) *
>>
>> Thanks!
>>
>> On Tue, Mar 16, 2021 at 3:06 PM Attila Zsolt Piros <
>> piros.attila.zs...@gmail.com> wrote:
>>
>>> Hi!
>>>
>>> This is weird. The code of foreachPartition
>>> <https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L801-L806>
>>>  leads
>>> to ParallelCollectionRDD
>>> <https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala#L84-L107>
>>>  which
>>> ends in slice
>>> <https://github.com/apache/spark/blob/f643bd96593dc411cb0cca1c7a3f28f93765c9b6/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala#L116-L155>,
>>> where the most important part is the *positions* method:
>>>
>>>  def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
>>>      (0 until numSlices).iterator.map { i =>
>>>         val start = ((i * length) / numSlices).toInt
>>>         val end = (((i + 1) * length) / numSlices).toInt
>>>         (start, end)
>>>      }
>>>  }
>>>
>>> Because of the extra ' (' you used in "*parallelize( (Array*" I thought
>>> some scala implicit might generate a Seq with one Array in it.
>>> But in that case your output would contain an Array. So this must be not
>>> the case.
>>>
>>> 1) What Spark/Scala version you are using? on what OS?
>>>
>>> 2)  Can you reproduce this issue in the spark-shell?
>>>
>>> scala> case class Animal(id:Int, name:String)
>>> defined class Animal
>>>
>>> scala> val myRDD = sc.parallelize( (Array( Animal(1, "Lion"),
>>> Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"
>>> Tiger"), Animal(5, "Chetah") ) ), 12)
>>> myRDD: org.apache.spark.rdd.RDD[Animal] = ParallelCollectionRDD[2] at
>>> parallelize at <console>:27
>>>
>>> scala> myRDD.foreachPartition( e => { println("----------");
>>> e.foreach(println) } )
>>> ----------
>>> ----------
>>> ----------
>>> Animal(1,Lion)
>>> ----------
>>> ----------
>>> Animal(2,Elephant)
>>> ----------
>>> ----------
>>> ----------
>>> Animal(3,Jaguar)
>>> ----------
>>> ----------
>>> Animal(4,Tiger)
>>> ----------
>>> ----------
>>> Animal(5,Chetah)
>>>
>>> scala> Console println myRDD.getNumPartitions
>>> 12
>>>
>>> 3) Can you please check spark-shell what happens when you paste the
>>> above method and call it like:
>>>
>>> scala> def positions(length: Long, numSlices: Int): Iterator[(Int, Int)]
>>> = {
>>>      |   (0 until numSlices).iterator.map { i =>
>>>      |     val start = ((i * length) / numSlices).toInt
>>>      |       val end = (((i + 1) * length) / numSlices).toInt
>>>      |       (start, end)
>>>      |   }
>>>      | }
>>> positions: (length: Long, numSlices: Int)Iterator[(Int, Int)]
>>>
>>> scala> positions(5, 12).foreach(println)
>>> (0,0)
>>> (0,0)
>>> (0,1)
>>> (1,1)
>>> (1,2)
>>> (2,2)
>>> (2,2)
>>> (2,3)
>>> (3,3)
>>> (3,4)
>>> (4,4)
>>> (4,5)
>>>
>>> As you can see in my case the `positions` result consistent with the 
>>> `foreachPartition`
>>> and this should be deterministic.
>>>
>>> Best regards,
>>> Attila
>>>
>>>
>>> On Tue, Mar 16, 2021 at 5:34 AM Renganathan Mutthiah <
>>> renganatha...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have a question with respect to default partitioning in RDD.
>>>>
>>>>
>>>>
>>>>
>>>> *case class Animal(id:Int, name:String)   val myRDD =
>>>> session.sparkContext.parallelize( (Array( Animal(1, "Lion"),
>>>> Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"Tiger"), Animal(5,
>>>> "Chetah") ) ))Console println myRDD.getNumPartitions  *
>>>>
>>>> I am running the above piece of code in my laptop which has 12 logical
>>>> cores.
>>>> Hence I see that there are 12 partitions created.
>>>>
>>>> My understanding is that hash partitioning is used to determine which
>>>> object needs to go to which partition. So in this case, the formula would
>>>> be: hashCode() % 12
>>>> But when I further examine, I see all the RDDs are put in the last
>>>> partition.
>>>>
>>>> *myRDD.foreachPartition( e => { println("----------");
>>>> e.foreach(println) } )*
>>>>
>>>> Above code prints the below(first eleven partitions are empty and the
>>>> last one has all the objects. The line is separate the partition contents):
>>>> ----------
>>>> ----------
>>>> ----------
>>>> ----------
>>>> ----------
>>>> ----------
>>>> ----------
>>>> ----------
>>>> ----------
>>>> ----------
>>>> ----------
>>>> ----------
>>>> Animal(2,Elephant)
>>>> Animal(4,Tiger)
>>>> Animal(3,Jaguar)
>>>> Animal(5,Chetah)
>>>> Animal(1,Lion)
>>>>
>>>> I don't know why this happens. Can you please help.
>>>>
>>>> Thanks!
>>>>
>>>

Reply via email to