Oh, sure that was the reason. You can keep using the `foreachPartition` and get the partition ID from the `TaskContext`:
scala> import org.apache.spark.TaskContext import org.apache.spark.TaskContext scala> myRDD.foreachPartition( e => { println(TaskContext.getPartitionId + ":" + e.mkString(",")) } ) 0: 1: 2:Animal(1,Lion) 3: 4:Animal(2,Elephant) 5: 6: 7:Animal(3,Jaguar) 8: 9:Animal(4,Tiger) 10: 11:Animal(5,Chetah) scala> On Tue, Mar 16, 2021 at 2:38 PM German Schiavon <gschiavonsp...@gmail.com> wrote: > Hi all, > > I guess you could do something like this too: > > [image: Captura de pantalla 2021-03-16 a las 14.35.46.png] > > On Tue, 16 Mar 2021 at 13:31, Renganathan Mutthiah < > renganatha...@gmail.com> wrote: > >> Hi Attila, >> >> Thanks for looking into this! >> >> I actually found the issue and it turned out to be that the print >> statements misled me. The records are indeed stored in different partitions. >> What happened is since the foreachpartition method is run parallelly by >> different threads, they all printed the first line almost at the same time >> and followed by data which is also printed at almost the same time. This >> has given an appearance that all the data is stored in a single partition. >> When I run the below code, I can see that the objects are stored in >> different partitions of course! >> >> *myRDD.mapPartitionsWithIndex( (index,itr) => { itr.foreach( e => >> println("Index : " +index +" " + e)); itr}, true).collect()* >> >> Prints the below... (index: ? the ? is actually the partition number) >> *Index : 9 Animal(4,Tiger) Index : 4 Animal(2,Elephant) Index : 11 >> Animal(5,Chetah) Index : 2 Animal(1,Lion) Index : 7 Animal(3,Jaguar) * >> >> Thanks! >> >> On Tue, Mar 16, 2021 at 3:06 PM Attila Zsolt Piros < >> piros.attila.zs...@gmail.com> wrote: >> >>> Hi! >>> >>> This is weird. The code of foreachPartition >>> <https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L801-L806> >>> leads >>> to ParallelCollectionRDD >>> <https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala#L84-L107> >>> which >>> ends in slice >>> <https://github.com/apache/spark/blob/f643bd96593dc411cb0cca1c7a3f28f93765c9b6/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala#L116-L155>, >>> where the most important part is the *positions* method: >>> >>> def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = { >>> (0 until numSlices).iterator.map { i => >>> val start = ((i * length) / numSlices).toInt >>> val end = (((i + 1) * length) / numSlices).toInt >>> (start, end) >>> } >>> } >>> >>> Because of the extra ' (' you used in "*parallelize( (Array*" I thought >>> some scala implicit might generate a Seq with one Array in it. >>> But in that case your output would contain an Array. So this must be not >>> the case. >>> >>> 1) What Spark/Scala version you are using? on what OS? >>> >>> 2) Can you reproduce this issue in the spark-shell? >>> >>> scala> case class Animal(id:Int, name:String) >>> defined class Animal >>> >>> scala> val myRDD = sc.parallelize( (Array( Animal(1, "Lion"), >>> Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4," >>> Tiger"), Animal(5, "Chetah") ) ), 12) >>> myRDD: org.apache.spark.rdd.RDD[Animal] = ParallelCollectionRDD[2] at >>> parallelize at <console>:27 >>> >>> scala> myRDD.foreachPartition( e => { println("----------"); >>> e.foreach(println) } ) >>> ---------- >>> ---------- >>> ---------- >>> Animal(1,Lion) >>> ---------- >>> ---------- >>> Animal(2,Elephant) >>> ---------- >>> ---------- >>> ---------- >>> Animal(3,Jaguar) >>> ---------- >>> ---------- >>> Animal(4,Tiger) >>> ---------- >>> ---------- >>> Animal(5,Chetah) >>> >>> scala> Console println myRDD.getNumPartitions >>> 12 >>> >>> 3) Can you please check spark-shell what happens when you paste the >>> above method and call it like: >>> >>> scala> def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] >>> = { >>> | (0 until numSlices).iterator.map { i => >>> | val start = ((i * length) / numSlices).toInt >>> | val end = (((i + 1) * length) / numSlices).toInt >>> | (start, end) >>> | } >>> | } >>> positions: (length: Long, numSlices: Int)Iterator[(Int, Int)] >>> >>> scala> positions(5, 12).foreach(println) >>> (0,0) >>> (0,0) >>> (0,1) >>> (1,1) >>> (1,2) >>> (2,2) >>> (2,2) >>> (2,3) >>> (3,3) >>> (3,4) >>> (4,4) >>> (4,5) >>> >>> As you can see in my case the `positions` result consistent with the >>> `foreachPartition` >>> and this should be deterministic. >>> >>> Best regards, >>> Attila >>> >>> >>> On Tue, Mar 16, 2021 at 5:34 AM Renganathan Mutthiah < >>> renganatha...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> I have a question with respect to default partitioning in RDD. >>>> >>>> >>>> >>>> >>>> *case class Animal(id:Int, name:String) val myRDD = >>>> session.sparkContext.parallelize( (Array( Animal(1, "Lion"), >>>> Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"Tiger"), Animal(5, >>>> "Chetah") ) ))Console println myRDD.getNumPartitions * >>>> >>>> I am running the above piece of code in my laptop which has 12 logical >>>> cores. >>>> Hence I see that there are 12 partitions created. >>>> >>>> My understanding is that hash partitioning is used to determine which >>>> object needs to go to which partition. So in this case, the formula would >>>> be: hashCode() % 12 >>>> But when I further examine, I see all the RDDs are put in the last >>>> partition. >>>> >>>> *myRDD.foreachPartition( e => { println("----------"); >>>> e.foreach(println) } )* >>>> >>>> Above code prints the below(first eleven partitions are empty and the >>>> last one has all the objects. The line is separate the partition contents): >>>> ---------- >>>> ---------- >>>> ---------- >>>> ---------- >>>> ---------- >>>> ---------- >>>> ---------- >>>> ---------- >>>> ---------- >>>> ---------- >>>> ---------- >>>> ---------- >>>> Animal(2,Elephant) >>>> Animal(4,Tiger) >>>> Animal(3,Jaguar) >>>> Animal(5,Chetah) >>>> Animal(1,Lion) >>>> >>>> I don't know why this happens. Can you please help. >>>> >>>> Thanks! >>>> >>>