There seems to be some problem with what gets captured in the closure
that's passed into the mapPartitions (myfunc in your case).

I've had a similar problem before:

http://apache-spark-user-list.1001560.n3.nabble.com/TaskNotSerializableException-when-running-through-Spark-shell-td16574.html

Try putting your myFunc in an object:

object Mapper {
  def myFunc = ...
}
val r = sc.parallelize(c).mapPartitions(Mapper.myFunc).collect()

On Fri, Oct 17, 2014 at 7:33 AM, davidkl <davidkl...@hotmail.com> wrote:

> Hello,
>
> Maybe there is something I do not get to understand, but I believe this
> code
> should not throw any serialization error when I run this in the spark
> shell.
> Using similar code with map instead of mapPartitions works just fine.
>
> import java.io.BufferedInputStream
> import java.io.FileInputStream
> import com.testing.DataPacket
>
> val inStream = new BufferedInputStream(new FileInputStream("inputFile"))
> val p = new DataPacket(inStream)
> val c = Array(p)
> val myfunc[T](iter: Iterator[T]) : Iterator[String] = {
>   var res = List[String]()
>   while (iter.hasNext)
>   {
>     val cur = iter.next;
>     res .::= ("")
>   }
>   res.iterator
> }
> var r = sc.parallelize(c).mapPartitions(myfunc).collect()
>
> This throws the following:
>
> org.apache.spark.SparkException: Task not serializable
>         at
>
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
>         at
> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
>         at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
>         at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
> ...
> ...
> Caused by: java.io.NotSerializableException: java.io.BufferedInputStream
>         at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
> ...
> ...
>
> Why is this code failing? The constructor of DataPacket just reads data,
> but
> does not keep any reference to the BufferedInputStream. Note that this is
> not the real code, but a simplification while trying to isolate the cause
> of
> the error I get. Using map on this instead of MapPartitions works fine.
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/bug-with-MapPartitions-tp16689.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to