There seems to be some problem with what gets captured in the closure that's passed into the mapPartitions (myfunc in your case).
I've had a similar problem before: http://apache-spark-user-list.1001560.n3.nabble.com/TaskNotSerializableException-when-running-through-Spark-shell-td16574.html Try putting your myFunc in an object: object Mapper { def myFunc = ... } val r = sc.parallelize(c).mapPartitions(Mapper.myFunc).collect() On Fri, Oct 17, 2014 at 7:33 AM, davidkl <davidkl...@hotmail.com> wrote: > Hello, > > Maybe there is something I do not get to understand, but I believe this > code > should not throw any serialization error when I run this in the spark > shell. > Using similar code with map instead of mapPartitions works just fine. > > import java.io.BufferedInputStream > import java.io.FileInputStream > import com.testing.DataPacket > > val inStream = new BufferedInputStream(new FileInputStream("inputFile")) > val p = new DataPacket(inStream) > val c = Array(p) > val myfunc[T](iter: Iterator[T]) : Iterator[String] = { > var res = List[String]() > while (iter.hasNext) > { > val cur = iter.next; > res .::= ("") > } > res.iterator > } > var r = sc.parallelize(c).mapPartitions(myfunc).collect() > > This throws the following: > > org.apache.spark.SparkException: Task not serializable > at > > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) > at > org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) > at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) > at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597) > ... > ... > Caused by: java.io.NotSerializableException: java.io.BufferedInputStream > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) > ... > ... > > Why is this code failing? The constructor of DataPacket just reads data, > but > does not keep any reference to the BufferedInputStream. Note that this is > not the real code, but a simplification while trying to isolate the cause > of > the error I get. Using map on this instead of MapPartitions works fine. > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/bug-with-MapPartitions-tp16689.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >