On Mon, Jul 28, 2014 at 12:58 PM, lllll <lishu...@gmail.com> wrote: > I have a file in s3 that I want to map each line with an index. Here is my > code: > >>>> input_data = sc.textFile('s3n:/myinput',minPartitions=6).cache() >>>> N input_data.count() >>>> index = sc.parallelize(range(N), 6) >>>> index.zip(input_data).collect()
I think you can not do zipWithIndex() in this way, because the number of lines in each partition of input_data will be different than index. You need get the exact number of lines for each partitions first, then generate correct index. It will be easy to do with mapPartitions() >>> nums = input_data.mapPartitions(lambda it: [sum(1 for i in it)]).collect() >>> starts = [sum(nums[:i]) for i in range(len(nums))] >>> zipped = input_data.mapPartitionsWithIndex(lambda i,it: ((starts[i]+j, x) >>> for j,x in enumerate(it))) > > ... > 14/07/28 19:49:31 INFO DAGScheduler: Completed ResultTask(18, 4) > 14/07/28 19:49:31 INFO DAGScheduler: Stage 18 (collect at <stdin>:1) > finished in 0.031 s > 14/07/28 19:49:31 INFO SparkContext: Job finished: collect at <stdin>:1, > took 0.039999707 s > Traceback (most recent call last): > File "<stdin>", line 1, in <module> > File "/root/spark/python/pyspark/rdd.py", line 584, in collect > return list(self._collect_iterator_through_file(bytesInJava)) > File "/root/spark/python/pyspark/rdd.py", line 592, in > _collect_iterator_through_file > self.ctx._writeToFile(iterator, tempFile.name) > File "/root/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py", > line 537, in __call__ > File "/root/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py", line > 300, in get_return_value > py4j.protocol.Py4JJavaError: An error occurred while calling > z:org.apache.spark.api.python.PythonRDD.writeToFile. > : java.lang.ClassCastException: java.lang.String cannot be cast to [B > at > org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$3.apply(PythonRDD.scala:312) > at > org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$3.apply(PythonRDD.scala:309) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:309) > at > org.apache.spark.api.python.PythonRDD$.writeToFile(PythonRDD.scala:342) > at > org.apache.spark.api.python.PythonRDD$.writeToFile(PythonRDD.scala:337) > at org.apache.spark.api.python.PythonRDD.writeToFile(PythonRDD.scala) > at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) > at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) > at py4j.Gateway.invoke(Gateway.java:259) > at > py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at py4j.commands.CallCommand.execute(CallCommand.java:79) > at py4j.GatewayConnection.run(GatewayConnection.java:207) > at java.lang.Thread.run(Thread.java:744) > As I see it, the job is completed, but I don't understand what's happening > to 'String cannot be cast to [B'. I tried to zip two parallelCollectionRDD > and it works fine. But here I have a MappedRDD at textFile. Not sure what's > going on here. Could you provide an script and dataset to reproduce this error? Maybe there are some corner cases during serialization. > Also, why Python does not have ZipWithIndex()? The features in PySpark are much less than Spark, hopefully it will catch up in next two releases. > > Thanks for any help. > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/zip-two-RDD-in-pyspark-tp10806.html > Sent from the Apache Spark User List mailing list archive at Nabble.com.