I have had a quick look at the query from Maciej. I see different
behaviour while running the piece of code in spark-shell and a
different one while running it as spark app.
1. While running in the spark-shell, I see the serialization error that
Maciej has reported.
2. But while running the same code as SparkApp, I see a different
behaviour.
I have put the code below. It would be great if someone can explain the
difference in behaviour.
Thanks,
Kabeer.
------------------------------------------------
Spark-Shell:
scala> sc.stop
scala> :paste
// Entering paste mode (ctrl-D to finish)
import org.apache.spark._
val sc = new SparkContext(new
SparkConf().setAppName("bar").set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer"))
println(sc.getConf.getOption("spark.serializer"))
val m = Map("a" -> 1, "b" -> 2)
val rdd5 = sc.makeRDD(Seq(m))
println("Map RDD is: ")
def mapFunc(input: Map[String, Int]) : Unit =
println(input.getOrElse("a", -2))
rdd5.map(mapFunc).collect()
// Exiting paste mode, now interpreting.
Some(org.apache.spark.serializer.KryoSerializer)
Map RDD is:
org.apache.spark.SparkException: Task not serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
-------------------------------------------------------------------------------------------------
----------------------------------------------------------------------------------------
Scenario 2:
Code:
package experiment
import org.apache.spark._
object Serialization1 extends App {
val sc = new SparkContext(new
SparkConf().setAppName("bar").set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
.setMaster("local[1]")
)
println(sc.getConf.getOption("spark.serializer"))
val m = Map("a" -> 1, "b" -> 2)
val rdd5 = sc.makeRDD(Seq(m))
println("Map RDD is: ")
def mapFunc(input: Map[String, Int]) : Unit =
println(input.getOrElse("a", -2))
rdd5.map(mapFunc).collect()
}
Run command:
spark-submit --class experiment.Serialization1
target/scala-2.10/learningspark_2.10-0.1-SNAPSHOT.jar
---------------------------------------------------------------------------------------------------------------
On Thu, 29 Sep, 2016 at 1:05 AM, Jakob Odersky <ja...@odersky.com>
wrote:
I agree with Sean's answer, you can check out the relevant serializer
here
https://github.com/twitter/chill/blob/develop/chill-scala/src/main/scala/com/twitter/chill/Traversable.scala
On Wed, Sep 28, 2016 at 3:11 AM, Sean Owen <so...@cloudera.com> wrote:
My guess is that Kryo specially handles Maps generically or relies on
some mechanism that does, and it happens to iterate over all
key/values as part of that and of course there aren't actually any
key/values in the map. The Java serialization is a much more literal
(expensive) field-by-field serialization which works here because
there's no special treatment. I think you could register a custom
serializer that handles this case. Or work around it in your client
code. I know there have been other issues with Kryo and Map because,
for example, sometimes a Map in an application is actually some
non-serializable wrapper view.
On Wed, Sep 28, 2016 at 3:18 AM, Maciej Szymkiewicz
<mszymkiew...@gmail.com> wrote:
Hi everyone,
I suspect there is no point in submitting a JIRA to fix this (not a
Spark
issue?) but I would like to know if this problem is documented
anywhere.
Somehow Kryo is loosing default value during serialization:
scala> import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.{SparkContext, SparkConf}
scala> val aMap = Map[String, Long]().withDefaultValue(0L)
aMap: scala.collection.immutable.Map[String,Long] = Map()
scala> aMap("a")
res6: Long = 0
scala> val sc = new SparkContext(new
SparkConf().setAppName("bar").set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer"))
scala> sc.parallelize(Seq(aMap)).map(_("a")).first
16/09/28 09:13:47 ERROR Executor: Exception in task 2.0 in stage 2.0
(TID 7)
java.util.NoSuchElementException: key not found: a
while Java serializer works just fine:
scala> val sc = new SparkContext(new
SparkConf().setAppName("bar").set("spark.serializer",
"org.apache.spark.serializer.JavaSerializer"))
scala> sc.parallelize(Seq(aMap)).map(_("a")).first
res9: Long = 0
--
Best regards,
Maciej
---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org