As we all know, a partition in Spark is actually an Iterator[T]. For some
purpose, I want to treat each partition not an Iterator but one whole
object. For example, treat Iterator[Int] to a
breeze.linalg.DenseVector[Int]. Thus I use 'mapPartitions' API to achieve
this, however, during the implementation, I observed some confused
observations.
I use Spark 1.3.0 on 10 executor nodes cluster, below is different attempts:

/import breeze.linalg.DenseVector/

val a = sc.parallelize( 1 to 100, 10)

val b = a.mapPartitions(iter =>{val v = Array.ofDim[Int](iter.size) 
        var ind = 0 
        while(iter.hasNext){
         v(ind) = iter.next
         ind += 1
        }
        println(v.mkString(","))
        Iterator.single[DenseVector[Int]](DenseVector(v))}  
)
b.count()

val c = a.mapPartitions(iter =>{val v = Array.ofDim[Int](iter.size) 
        iter.copyToArray(v, 0, 10)
        println(v.mkString(","))
        Iterator.single[DenseVector[Int]](DenseVector(v))} 
)
c.count()

val d = a.mapPartitions(iter =>{val v = iter.toArray 
        println(v.mkString(","))
        Iterator.single[DenseVector[Int]](DenseVector(v))} 
)
d.count()

I can see the printed output in the executor's stdout, actually only attempt
'd' satisfy my needs, and other attempts only get a zero desevector, which
means the variable assignment from iterator to vector did not happen.

Hope for explanations for these observations. 
Thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Looking-inside-the-mapPartitions-transformation-some-confused-observations-tp22850.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to