Hello,

Maybe there is something I do not get to understand, but I believe this code
should not throw any serialization error when I run this in the spark shell.
Using similar code with map instead of mapPartitions works just fine.

import java.io.BufferedInputStream
import java.io.FileInputStream
import com.testing.DataPacket

val inStream = new BufferedInputStream(new FileInputStream("inputFile"))
val p = new DataPacket(inStream)
val c = Array(p)
val myfunc[T](iter: Iterator[T]) : Iterator[String] = {
  var res = List[String]()
  while (iter.hasNext)
  {
    val cur = iter.next;
    res .::= ("")
  }
  res.iterator
}
var r = sc.parallelize(c).mapPartitions(myfunc).collect()

This throws the following:

org.apache.spark.SparkException: Task not serializable
        at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
        at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
...
...
Caused by: java.io.NotSerializableException: java.io.BufferedInputStream
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
...
...

Why is this code failing? The constructor of DataPacket just reads data, but
does not keep any reference to the BufferedInputStream. Note that this is
not the real code, but a simplification while trying to isolate the cause of
the error I get. Using map on this instead of MapPartitions works fine.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/bug-with-MapPartitions-tp16689.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to