Hi Thierry,
Your code does not work if @yh18190 wants a global counter. A RDD may have
more than one partition. For each partition, cnt will be reset to -1. You
can try the following code:
scala val rdd = sc.parallelize( (1, 'a') :: (2, 'b') :: (3, 'c') :: (4,
'd') :: Nil)
rdd: org.apache.spark.rdd.RDD[(Int, Char)] = ParallelCollectionRDD[3] at
parallelize at console:12
scala import org.apache.spark.HashPartitioner
import org.apache.spark.HashPartitioner
scala val rdd2 = rdd.partitionBy(new HashPartitioner(2))
rdd2: org.apache.spark.rdd.RDD[(Int, Char)] = ShuffledRDD[4] at partitionBy
at console:18
scala var cnt = -1
cnt: Int = -1
scala val rdd3 = rdd2.map(i = {cnt+=1; (cnt,i)} )
rdd3: org.apache.spark.rdd.RDD[(Int, (Int, Char))] = MappedRDD[5] at map at
console:22
scala rdd3.collect
res2: Array[(Int, (Int, Char))] = Array((0,(2,b)), (1,(4,d)), (0,(1,a)),
(1,(3,c)))
A proper solution is using rdd.partitionBy(new HashPartitioner(1)) to
make sure there is only one partition. But that's not efficient for big
input.
Best Regards,
Shixiong Zhu
2014-04-02 11:10 GMT+08:00 Thierry Herrmann thierry.herrm...@gmail.com:
I'm new to Spark, but isn't this a pure scala question ?
The following seems to work with the spark shell:
$ spark-shell
scala val rdd = sc.makeRDD(List(10,20,30))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at makeRDD at
console:12
scala var cnt = -1
cnt: Int = -1
scala val rdd2 = rdd.map(i = {cnt+=1; (cnt,i)} )
rdd2: org.apache.spark.rdd.RDD[(Int, Int)] = MappedRDD[9] at map at
console:16
scala rdd2.collect
res8: Array[(Int, Int)] = Array((0,10), (1,20), (2,30))
Thierry
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-index-each-map-operation-tp3471p3614.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.